diff options
author | 2016-06-21 14:38:00 -0700 | |
---|---|---|
committer | 2016-06-21 14:38:00 -0700 | |
commit | 4bcd135824dcf6e68d6827af757accf7e8297bdb (patch) | |
tree | 4b7d0c25e31709d360b543930b32da1bf7a4a91d /src/core/lib/surface | |
parent | 773a8825bcb9cae06af18263b112b8f8fd96f10d (diff) | |
parent | 5988716d9d6e33cd59631865527d73d3caa87387 (diff) |
Merge branch 'master' into fixes
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/alarm.c | 4 | ||||
-rw-r--r-- | src/core/lib/surface/call.c | 86 | ||||
-rw-r--r-- | src/core/lib/surface/call_log_batch.c | 2 | ||||
-rw-r--r-- | src/core/lib/surface/channel.c | 7 | ||||
-rw-r--r-- | src/core/lib/surface/channel_ping.c | 6 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 68 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.h | 4 | ||||
-rw-r--r-- | src/core/lib/surface/init.c | 1 | ||||
-rw-r--r-- | src/core/lib/surface/lame_client.c | 13 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 116 |
10 files changed, 197 insertions, 110 deletions
diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c index 2cf2f00b31..aa9d60ee6a 100644 --- a/src/core/lib/surface/alarm.c +++ b/src/core/lib/surface/alarm.c @@ -48,9 +48,9 @@ struct grpc_alarm { static void do_nothing_end_completion(grpc_exec_ctx *exec_ctx, void *arg, grpc_cq_completion *c) {} -static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_alarm *alarm = arg; - grpc_cq_end_op(exec_ctx, alarm->cq, alarm->tag, success, + grpc_cq_end_op(exec_ctx, alarm->cq, alarm->tag, error, do_nothing_end_completion, NULL, &alarm->completion); } diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index a9b1e25a77..04291b0ee0 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -100,6 +100,7 @@ typedef struct batch_control { grpc_closure finish_batch; void *notify_tag; gpr_refcount steps_to_complete; + grpc_error *error; uint8_t send_initial_metadata; uint8_t send_message; @@ -108,7 +109,6 @@ typedef struct batch_control { uint8_t recv_message; uint8_t recv_final_op; uint8_t is_notify_tag_closure; - uint8_t success; } batch_control; struct grpc_call { @@ -221,9 +221,9 @@ static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, grpc_status_code status, const char *description); static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack, - bool success); + grpc_error *error); static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, - bool success); + grpc_error *error); grpc_call *grpc_call_create( grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, @@ -360,7 +360,8 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON); } -static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) { +static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, + grpc_error *error) { size_t i; int ii; grpc_call *c = call; @@ -746,22 +747,23 @@ typedef struct termination_closure { enum { TC_CANCEL, TC_CLOSE } type; } termination_closure; -static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, bool success) { +static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, + grpc_error *error) { termination_closure *tc = tcp; - if (tc->type == TC_CANCEL) { - GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "cancel"); - } - if (tc->type == TC_CLOSE) { - GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "close"); + switch (tc->type) { + case TC_CANCEL: + GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "cancel"); + break; + case TC_CLOSE: + GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "close"); + break; } gpr_slice_unref(tc->optional_message); - if (tc->op_closure != NULL) { - grpc_exec_ctx_enqueue(exec_ctx, tc->op_closure, true, NULL); - } + grpc_exec_ctx_sched(exec_ctx, tc->op_closure, GRPC_ERROR_NONE, NULL); gpr_free(tc); } -static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, bool success) { +static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { grpc_transport_stream_op op; termination_closure *tc = tcp; memset(&op, 0, sizeof(op)); @@ -772,7 +774,7 @@ static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, bool success) { execute_op(exec_ctx, tc->call, &op); } -static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, bool success) { +static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { grpc_transport_stream_op op; termination_closure *tc = tcp; memset(&op, 0, sizeof(op)); @@ -803,7 +805,7 @@ static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, grpc_closure_init(&tc->closure, send_close, tc); GRPC_CALL_INTERNAL_REF(tc->call, "close"); } - grpc_exec_ctx_enqueue(exec_ctx, &tc->closure, true, NULL); + grpc_exec_ctx_sched(exec_ctx, &tc->closure, GRPC_ERROR_NONE, NULL); return GRPC_CALL_OK; } @@ -866,11 +868,11 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { return CALL_FROM_TOP_ELEM(elem); } -static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_call *call = arg; gpr_mu_lock(&call->mu); call->have_alarm = 0; - if (success) { + if (error != GRPC_ERROR_CANCELLED) { cancel_with_status(exec_ctx, call, GRPC_STATUS_DEADLINE_EXCEEDED, "Deadline Exceeded"); } @@ -1056,7 +1058,8 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl) { grpc_call *call = bctl->call; if (bctl->is_notify_tag_closure) { - grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success, NULL); + /* unrefs bctl->error */ + grpc_exec_ctx_sched(exec_ctx, bctl->notify_tag, bctl->error, NULL); gpr_mu_lock(&call->mu); bctl->call->used_batches = (uint8_t)(bctl->call->used_batches & @@ -1064,7 +1067,8 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&call->mu); GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); } else { - grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, bctl->success, + /* unrefs bctl->error */ + grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, bctl->error, finish_batch_completion, bctl, &bctl->cq_completion); } } @@ -1096,15 +1100,18 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, } static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, - bool success) { + grpc_error *error) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; - if (success) { + if (error == GRPC_ERROR_NONE) { gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, call->receiving_slice); continue_receiving_slices(exec_ctx, bctl); } else { + if (grpc_trace_operation_failures) { + GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error)); + } grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); call->receiving_stream = NULL; grpc_byte_buffer_destroy(*call->receiving_buffer); @@ -1153,15 +1160,15 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl, } static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, - bool success) { + grpc_error *error) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; gpr_mu_lock(&bctl->call->mu); - if (bctl->call->has_initial_md_been_received || !success || + if (bctl->call->has_initial_md_been_received || error != GRPC_ERROR_NONE || call->receiving_stream == NULL) { gpr_mu_unlock(&bctl->call->mu); - process_data_after_md(exec_ctx, bctlp, success); + process_data_after_md(exec_ctx, bctlp, error); } else { call->saved_receiving_stream_ready_bctlp = bctlp; gpr_mu_unlock(&bctl->call->mu); @@ -1182,7 +1189,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, if (algo >= GRPC_COMPRESS_ALGORITHMS_COUNT) { gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.", algo); - gpr_log(GPR_ERROR, error_msg); + gpr_log(GPR_ERROR, "%s", error_msg); close_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg); } else if (grpc_compression_options_is_algorithm_enabled( &compression_options, algo) == 0) { @@ -1191,7 +1198,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, grpc_compression_algorithm_name(algo, &algo_name); gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.", algo_name); - gpr_log(GPR_ERROR, error_msg); + gpr_log(GPR_ERROR, "%s", error_msg); close_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg); } else { call->incoming_compression_algorithm = algo; @@ -1219,14 +1226,14 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, } static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, - void *bctlp, bool success) { + void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; gpr_mu_lock(&call->mu); - if (!success) { - bctl->success = false; + if (error != GRPC_ERROR_NONE) { + bctl->error = GRPC_ERROR_REF(error); } else { grpc_metadata_batch *md = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; @@ -1250,7 +1257,7 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, grpc_closure *saved_rsr_closure = grpc_closure_create( receiving_stream_ready, call->saved_receiving_stream_ready_bctlp); call->saved_receiving_stream_ready_bctlp = NULL; - grpc_exec_ctx_enqueue(exec_ctx, saved_rsr_closure, success, NULL); + grpc_exec_ctx_sched(exec_ctx, saved_rsr_closure, error, NULL); } gpr_mu_unlock(&call->mu); @@ -1260,15 +1267,18 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, } } -static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { +static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, + grpc_error *error) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; grpc_call *child_call; grpc_call *next_child_call; + GRPC_ERROR_REF(error); + gpr_mu_lock(&call->mu); if (bctl->send_initial_metadata) { - if (!success) { + if (error != GRPC_ERROR_NONE) { set_status_code(call, STATUS_FROM_CORE, GRPC_STATUS_UNAVAILABLE); } grpc_metadata_batch_destroy( @@ -1314,13 +1324,17 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { call->final_op.server.cancelled); } - success = 1; + GRPC_ERROR_UNREF(error); + error = GRPC_ERROR_NONE; } - bctl->success = success != 0; + GRPC_ERROR_UNREF(bctl->error); + bctl->error = GRPC_ERROR_REF(error); gpr_mu_unlock(&call->mu); if (gpr_unref(&bctl->steps_to_complete)) { post_batch_completion(exec_ctx, bctl); } + + GRPC_ERROR_UNREF(error); } static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, @@ -1350,7 +1364,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, if (nops == 0) { GRPC_CALL_INTERNAL_REF(call, "completion"); - bctl->success = 1; + bctl->error = GRPC_ERROR_NONE; if (!is_notify_tag_closure) { grpc_cq_begin_op(call->cq, notify_tag); } diff --git a/src/core/lib/surface/call_log_batch.c b/src/core/lib/surface/call_log_batch.c index a6d1d5149f..31c074f15d 100644 --- a/src/core/lib/surface/call_log_batch.c +++ b/src/core/lib/surface/call_log_batch.c @@ -112,7 +112,7 @@ void grpc_call_log_batch(char *file, int line, gpr_log_severity severity, size_t i; for (i = 0; i < nops; i++) { tmp = grpc_op_string(&ops[i]); - gpr_log(file, line, severity, "ops[%d]: %s", i, tmp); + gpr_log(file, line, severity, "ops[%" PRIuPTR "]: %s", i, tmp); gpr_free(tmp); } } diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index f0b3c2e15d..8936e7164f 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -83,7 +83,8 @@ struct grpc_channel { /* the protobuf library will (by default) start warning at 100megs */ #define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024) -static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, bool success); +static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, const grpc_channel_args *input_args, @@ -310,7 +311,7 @@ void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, } static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, - bool iomgr_success) { + grpc_error *error) { grpc_channel *channel = arg; grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CHANNEL(channel)); while (channel->registered_calls) { @@ -336,7 +337,7 @@ void grpc_channel_destroy(grpc_channel *channel) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GRPC_API_TRACE("grpc_channel_destroy(channel=%p)", 1, (channel)); memset(&op, 0, sizeof(op)); - op.disconnect = 1; + op.disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed"); elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0); elem->filter->start_transport_op(&exec_ctx, elem, &op); diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c index 5a50698695..9818f9d2f2 100644 --- a/src/core/lib/surface/channel_ping.c +++ b/src/core/lib/surface/channel_ping.c @@ -53,10 +53,10 @@ static void ping_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(arg); } -static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { ping_result *pr = arg; - grpc_cq_end_op(exec_ctx, pr->cq, pr->tag, success, ping_destroy, pr, - &pr->completion_storage); + grpc_cq_end_op(exec_ctx, pr->cq, pr->tag, GRPC_ERROR_REF(error), ping_destroy, + pr, &pr->completion_storage); } void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq, diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 5eb7cf1bf4..2cc6aa74e0 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -50,6 +50,8 @@ #include "src/core/lib/surface/event_string.h" #include "src/core/lib/surface/surface_trace.h" +int grpc_trace_operation_failures; + typedef struct { grpc_pollset_worker **worker; void *tag; @@ -92,7 +94,7 @@ static gpr_mu g_freelist_mu; static grpc_completion_queue *g_freelist; static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc, - bool success); + grpc_error *error); void grpc_cq_global_init(void) { gpr_mu_init(&g_freelist_mu); } @@ -176,7 +178,7 @@ void grpc_cq_internal_ref(grpc_completion_queue *cc) { } static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, - bool success) { + grpc_error *error) { grpc_completion_queue *cc = arg; GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy"); } @@ -219,7 +221,7 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { event, then enter shutdown mode */ /* Queue a GRPC_OP_COMPLETED operation */ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, - void *tag, int success, + void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { @@ -231,16 +233,24 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, #endif GPR_TIMER_BEGIN("grpc_cq_end_op", 0); - GRPC_API_TRACE( - "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, success=%d, done=%p, " - "done_arg=%p, storage=%p)", - 7, (exec_ctx, cc, tag, success, done, done_arg, storage)); + if (grpc_api_trace || + (grpc_trace_operation_failures && error != GRPC_ERROR_NONE)) { + const char *errmsg = grpc_error_string(error); + GRPC_API_TRACE( + "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, error=%s, done=%p, " + "done_arg=%p, storage=%p)", + 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); + if (grpc_trace_operation_failures) { + gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); + } + grpc_error_free_string(errmsg); + } storage->tag = tag; storage->done = done; storage->done_arg = done_arg; - storage->next = - ((uintptr_t)&cc->completed_head) | ((uintptr_t)(success != 0)); + storage->next = ((uintptr_t)&cc->completed_head) | + ((uintptr_t)(error == GRPC_ERROR_NONE)); gpr_mu_lock(cc->mu); #ifndef NDEBUG @@ -267,8 +277,15 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, break; } } - grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker); + grpc_error *kick_error = + grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker); gpr_mu_unlock(cc->mu); + if (kick_error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(kick_error); + gpr_log(GPR_ERROR, "Kick failed: %s", msg); + grpc_error_free_string(msg); + GRPC_ERROR_UNREF(kick_error); + } } else { cc->completed_tail->next = ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next); @@ -282,6 +299,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, } GPR_TIMER_END("grpc_cq_end_op", 0); + + GRPC_ERROR_UNREF(error); } grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, @@ -347,8 +366,18 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_mu_lock(cc->mu); continue; } else { - grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, - iteration_deadline); + grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), + &worker, now, iteration_deadline); + if (err != GRPC_ERROR_NONE) { + gpr_mu_unlock(cc->mu); + const char *msg = grpc_error_string(err); + gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg); + grpc_error_free_string(msg); + GRPC_ERROR_UNREF(err); + memset(&ret, 0, sizeof(ret)); + ret.type = GRPC_QUEUE_TIMEOUT; + break; + } } } GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); @@ -464,8 +493,19 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(cc->mu); } else { - grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, - iteration_deadline); + grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), + &worker, now, iteration_deadline); + if (err != GRPC_ERROR_NONE) { + del_plucker(cc, tag, &worker); + gpr_mu_unlock(cc->mu); + const char *msg = grpc_error_string(err); + gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg); + grpc_error_free_string(msg); + GRPC_ERROR_UNREF(err); + memset(&ret, 0, sizeof(ret)); + ret.type = GRPC_QUEUE_TIMEOUT; + break; + } } del_plucker(cc, tag, &worker); } diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 3d0dd13c53..b9dd1092b6 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -39,6 +39,8 @@ #include <grpc/grpc.h> #include "src/core/lib/iomgr/pollset.h" +extern int grpc_trace_operation_failures; + typedef struct grpc_cq_completion { /** user supplied tag */ void *tag; @@ -75,7 +77,7 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag); /* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to grpc_cq_begin_op */ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, - void *tag, int success, + void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage); diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 1c8b709015..f07039cb94 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -165,6 +165,7 @@ void grpc_init(void) { &grpc_trace_channel_stack_builder); grpc_register_tracer("http1", &grpc_http1_trace); grpc_register_tracer("compression", &grpc_compression_trace); + grpc_register_tracer("op_failure", &grpc_trace_operation_failures); grpc_security_pre_init(); grpc_iomgr_init(); grpc_executor_init(); diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index 8223555382..5ea4cba5d1 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -80,7 +80,8 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx, } else if (op->recv_trailing_metadata != NULL) { fill_metadata(elem, op->recv_trailing_metadata); } - grpc_transport_stream_op_finish_with_failure(exec_ctx, op); + grpc_transport_stream_op_finish_with_failure( + exec_ctx, op, GRPC_ERROR_CREATE("lame client channel")); } static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { @@ -93,15 +94,17 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, if (op->on_connectivity_state_change) { GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_SHUTDOWN); *op->connectivity_state = GRPC_CHANNEL_SHUTDOWN; - op->on_connectivity_state_change->cb( - exec_ctx, op->on_connectivity_state_change->cb_arg, 1); + grpc_exec_ctx_sched(exec_ctx, op->on_connectivity_state_change, + GRPC_ERROR_NONE, NULL); } if (op->on_consumed != NULL) { - op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 1); + grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); } if (op->send_ping != NULL) { - op->send_ping->cb(exec_ctx, op->send_ping->cb_arg, 0); + grpc_exec_ctx_sched(exec_ctx, op->send_ping, + GRPC_ERROR_CREATE("lame client channel"), NULL); } + GRPC_ERROR_UNREF(op->disconnect_with_error); } static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 88d898bc1c..def6e5068b 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -230,9 +230,10 @@ struct grpc_server { #define SERVER_FROM_CALL_ELEM(elem) \ (((channel_data *)(elem)->channel_data)->server) -static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *calld, bool success); +static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *calld, + grpc_error *error); static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, - size_t cq_idx, requested_call *rc); + size_t cq_idx, requested_call *rc, grpc_error *error); /* Before calling maybe_finish_shutdown, we must hold mu_global and not hold mu_call */ static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_server *server); @@ -263,14 +264,14 @@ struct shutdown_cleanup_args { }; static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg, - bool iomgr_status_ignored) { + grpc_error *error) { struct shutdown_cleanup_args *a = arg; gpr_slice_unref(a->slice); gpr_free(a); } static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, - int send_goaway, int send_disconnect) { + int send_goaway, grpc_error *send_disconnect) { grpc_transport_op op; struct shutdown_cleanup_args *sc; grpc_channel_element *elem; @@ -281,7 +282,7 @@ static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, sc->slice = gpr_slice_from_copied_string("Server shutdown"); op.goaway_message = &sc->slice; op.goaway_status = GRPC_STATUS_OK; - op.disconnect = send_disconnect; + op.disconnect_with_error = send_disconnect; grpc_closure_init(&sc->closure, shutdown_cleanup, sc); op.on_consumed = &sc->closure; @@ -292,14 +293,16 @@ static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx, channel_broadcaster *cb, int send_goaway, - int force_disconnect) { + grpc_error *force_disconnect) { size_t i; for (i = 0; i < cb->num_channels; i++) { - send_shutdown(exec_ctx, cb->channels[i], send_goaway, force_disconnect); + send_shutdown(exec_ctx, cb->channels[i], send_goaway, + GRPC_ERROR_REF(force_disconnect)); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, cb->channels[i], "broadcast"); } gpr_free(cb->channels); + GRPC_ERROR_UNREF(force_disconnect); } /* @@ -325,7 +328,8 @@ static void request_matcher_destroy(request_matcher *rm) { gpr_free(rm->requests_per_cq); } -static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, bool success) { +static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, + grpc_error *error) { grpc_call_destroy(grpc_call_from_top_element(elem)); } @@ -340,20 +344,24 @@ static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx, grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, + NULL); } } static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx, grpc_server *server, - request_matcher *rm) { + request_matcher *rm, + grpc_error *error) { int request_id; for (size_t i = 0; i < server->cq_count; i++) { while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) != -1) { - fail_call(exec_ctx, server, i, &server->requested_calls[request_id]); + fail_call(exec_ctx, server, i, &server->requested_calls[request_id], + GRPC_ERROR_REF(error)); } } + GRPC_ERROR_UNREF(error); } /* @@ -410,7 +418,7 @@ static void orphan_channel(channel_data *chand) { } static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd, - bool success) { + grpc_error *error) { channel_data *chand = cd; grpc_server *server = chand->server; GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "server"); @@ -499,25 +507,26 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, grpc_call_stack_element(grpc_call_get_call_stack(call), 0); channel_data *chand = elem->channel_data; server_ref(chand->server); - grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, true, done_request_event, rc, - &rc->completion); + grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE, + done_request_event, rc, &rc->completion); } -static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { grpc_call_element *call_elem = arg; call_data *calld = call_elem->call_data; channel_data *chand = call_elem->channel_data; request_matcher *rm = calld->request_matcher; grpc_server *server = rm->server; - if (!success || gpr_atm_acq_load(&server->shutdown_flag)) { + if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) { gpr_mu_lock(&calld->mu_state); calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, error, NULL); return; } @@ -562,7 +571,8 @@ static void finish_start_new_rpc( calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, + NULL); return; } @@ -570,7 +580,7 @@ static void finish_start_new_rpc( switch (payload_handling) { case GRPC_SRM_PAYLOAD_NONE: - publish_new_rpc(exec_ctx, elem, true); + publish_new_rpc(exec_ctx, elem, GRPC_ERROR_NONE); break; case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: { grpc_op op; @@ -658,18 +668,21 @@ static int num_channels(grpc_server *server) { } static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx, - grpc_server *server) { + grpc_server *server, grpc_error *error) { if (server->started) { request_matcher_kill_requests(exec_ctx, server, - &server->unregistered_request_matcher); + &server->unregistered_request_matcher, + GRPC_ERROR_REF(error)); request_matcher_zombify_all_pending_calls( exec_ctx, &server->unregistered_request_matcher); for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher); + request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher, + GRPC_ERROR_REF(error)); request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher); } } + GRPC_ERROR_UNREF(error); } static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, @@ -679,7 +692,8 @@ static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, return; } - kill_pending_work_locked(exec_ctx, server); + kill_pending_work_locked(exec_ctx, server, + GRPC_ERROR_CREATE("Server Shutdown")); if (server->root_channel_data.next != &server->root_channel_data || server->listeners_destroyed < num_listeners(server)) { @@ -700,7 +714,8 @@ static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, for (i = 0; i < server->num_shutdown_tags; i++) { server_ref(server); grpc_cq_end_op(exec_ctx, server->shutdown_tags[i].cq, - server->shutdown_tags[i].tag, 1, done_shutdown_event, server, + server->shutdown_tags[i].tag, GRPC_ERROR_NONE, + done_shutdown_event, server, &server->shutdown_tags[i].completion); } } @@ -723,11 +738,12 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { } static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, - bool success) { + grpc_error *error) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; gpr_timespec op_deadline; + GRPC_ERROR_REF(error); grpc_metadata_batch_filter(calld->recv_initial_metadata, server_filter, elem); op_deadline = calld->recv_initial_metadata->deadline; if (0 != gpr_time_cmp(op_deadline, gpr_inf_future(op_deadline.clock_type))) { @@ -736,11 +752,13 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, if (calld->host && calld->path) { /* do nothing */ } else { - success = 0; + GRPC_ERROR_UNREF(error); + error = + GRPC_ERROR_CREATE_REFERENCING("Missing :authority or :path", &error, 1); } - calld->on_done_recv_initial_metadata->cb( - exec_ctx, calld->on_done_recv_initial_metadata->cb_arg, success); + grpc_exec_ctx_sched(exec_ctx, calld->on_done_recv_initial_metadata, error, + NULL); } static void server_mutate_op(grpc_call_element *elem, @@ -765,10 +783,10 @@ static void server_start_transport_stream_op(grpc_exec_ctx *exec_ctx, } static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, - bool success) { + grpc_error *error) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; - if (success) { + if (error == GRPC_ERROR_NONE) { start_new_rpc(exec_ctx, elem); } else { gpr_mu_lock(&calld->mu_state); @@ -776,7 +794,8 @@ static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, + GRPC_ERROR_NONE, NULL); } else if (calld->state == PENDING) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); @@ -809,7 +828,7 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, } static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd, - bool iomgr_status_ignored) { + grpc_error *error) { channel_data *chand = cd; grpc_server *server = chand->server; if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { @@ -1148,7 +1167,9 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, op.set_accept_stream_user_data = chand; op.on_connectivity_state_change = &chand->channel_connectivity_changed; op.connectivity_state = &chand->connectivity_state; - op.disconnect = gpr_atm_acq_load(&s->shutdown_flag) != 0; + if (gpr_atm_acq_load(&s->shutdown_flag) != 0) { + op.disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown"); + } grpc_transport_perform_op(exec_ctx, transport, &op); } @@ -1159,7 +1180,7 @@ void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg, } static void listener_destroy_done(grpc_exec_ctx *exec_ctx, void *s, - bool success) { + grpc_error *error) { grpc_server *server = s; gpr_mu_lock(&server->mu_global); server->listeners_destroyed++; @@ -1181,8 +1202,8 @@ void grpc_server_shutdown_and_notify(grpc_server *server, gpr_mu_lock(&server->mu_global); grpc_cq_begin_op(cq, tag); if (server->shutdown_published) { - grpc_cq_end_op(&exec_ctx, cq, tag, 1, done_published_shutdown, NULL, - gpr_malloc(sizeof(grpc_cq_completion))); + grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, + NULL, gpr_malloc(sizeof(grpc_cq_completion))); gpr_mu_unlock(&server->mu_global); goto done; } @@ -1205,7 +1226,8 @@ void grpc_server_shutdown_and_notify(grpc_server *server, /* collect all unregistered then registered calls */ gpr_mu_lock(&server->mu_call); - kill_pending_work_locked(&exec_ctx, server); + kill_pending_work_locked(&exec_ctx, server, + GRPC_ERROR_CREATE("Server Shutdown")); gpr_mu_unlock(&server->mu_call); maybe_finish_shutdown(&exec_ctx, server); @@ -1233,7 +1255,8 @@ void grpc_server_cancel_all_calls(grpc_server *server) { channel_broadcaster_init(server, &broadcaster); gpr_mu_unlock(&server->mu_global); - channel_broadcaster_shutdown(&exec_ctx, &broadcaster, 0, 1); + channel_broadcaster_shutdown(&exec_ctx, &broadcaster, 0, + GRPC_ERROR_CREATE("Cancelling all calls")); grpc_exec_ctx_finish(&exec_ctx); } @@ -1280,13 +1303,15 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, request_matcher *rm = NULL; int request_id; if (gpr_atm_acq_load(&server->shutdown_flag)) { - fail_call(exec_ctx, server, cq_idx, rc); + fail_call(exec_ctx, server, cq_idx, rc, + GRPC_ERROR_CREATE("Server Shutdown")); return GRPC_CALL_OK; } request_id = gpr_stack_lockfree_pop(server->request_freelist); if (request_id == -1) { /* out of request ids: just fail this one */ - fail_call(exec_ctx, server, cq_idx, rc); + fail_call(exec_ctx, server, cq_idx, rc, + GRPC_ERROR_CREATE("Server Shutdown")); return GRPC_CALL_OK; } switch (rc->type) { @@ -1314,8 +1339,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, - NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, + GRPC_ERROR_NONE, NULL); } else { GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; @@ -1421,13 +1446,14 @@ done: } static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, - size_t cq_idx, requested_call *rc) { + size_t cq_idx, requested_call *rc, grpc_error *error) { *rc->call = NULL; rc->initial_metadata->count = 0; + GPR_ASSERT(error != GRPC_ERROR_NONE); server_ref(server); - grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, 0, done_request_event, - rc, &rc->completion); + grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error, + done_request_event, rc, &rc->completion); } const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { |