diff options
-rw-r--r-- | src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c | 5 | ||||
-rw-r--r-- | src/core/lib/iomgr/socket_utils_common_posix.c | 3 | ||||
-rw-r--r-- | src/core/lib/surface/channel.c | 5 | ||||
-rw-r--r-- | src/core/lib/surface/channel_ping.c | 4 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 14 | ||||
-rw-r--r-- | src/core/lib/surface/lame_client.c | 10 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 58 | ||||
-rw-r--r-- | src/core/lib/transport/connectivity_state.c | 16 | ||||
-rw-r--r-- | src/core/lib/transport/transport.c | 19 | ||||
-rw-r--r-- | src/core/lib/transport/transport.h | 3 |
10 files changed, 76 insertions, 61 deletions
diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index 698b2bef61..031cebb7c3 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -139,11 +139,12 @@ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep, on_accept, state); } -static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, bool success) { +static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, + grpc_error *error) { grpc_server_secure_state *state = statep; if (state->destroy_callback != NULL) { state->destroy_callback->cb(exec_ctx, state->destroy_callback->cb_arg, - success); + grpc_error_ref(error)); } grpc_server_security_connector_shutdown(exec_ctx, state->sc); state_unref(state); diff --git a/src/core/lib/iomgr/socket_utils_common_posix.c b/src/core/lib/iomgr/socket_utils_common_posix.c index 11ab090495..83941574df 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.c +++ b/src/core/lib/iomgr/socket_utils_common_posix.c @@ -88,8 +88,7 @@ grpc_error *grpc_set_socket_no_sigpipe_if_possible(int fd) { return GRPC_OS_ERROR(errno, "getsockopt(SO_NOSIGPIPE)"); } if ((newval != 0) == val) { - return grpc_error_set_str(GRPC_ERROR_CREATE(), GRPC_ERROR_STR_grpc_os_error, - "Failed to set SO_NOSIGPIPE"); + return GRPC_ERROR_CREATE("Failed to set SO_NOSIGPIPE"); } #endif return GRPC_ERROR_NONE; diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index b6b760b5d8..ac5e1fcc0c 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -80,7 +80,8 @@ struct grpc_channel { /* the protobuf library will (by default) start warning at 100megs */ #define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024) -static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, bool success); +static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, const grpc_channel_args *input_args, @@ -267,7 +268,7 @@ void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, } static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, - bool iomgr_success) { + grpc_error *error) { grpc_channel *channel = arg; grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CHANNEL(channel)); while (channel->registered_calls) { diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c index 5a50698695..3b4f1221f3 100644 --- a/src/core/lib/surface/channel_ping.c +++ b/src/core/lib/surface/channel_ping.c @@ -53,9 +53,9 @@ static void ping_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(arg); } -static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { ping_result *pr = arg; - grpc_cq_end_op(exec_ctx, pr->cq, pr->tag, success, ping_destroy, pr, + grpc_cq_end_op(exec_ctx, pr->cq, pr->tag, error, ping_destroy, pr, &pr->completion_storage); } diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 1f82c3bad2..50e2a1bd16 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -89,7 +89,7 @@ static gpr_mu g_freelist_mu; static grpc_completion_queue *g_freelist; static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc, - bool success); + grpc_error *error); void grpc_cq_global_init(void) { gpr_mu_init(&g_freelist_mu); } @@ -172,7 +172,7 @@ void grpc_cq_internal_ref(grpc_completion_queue *cc) { } static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, - bool success) { + grpc_error *error) { grpc_completion_queue *cc = arg; GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy"); } @@ -215,7 +215,7 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { event, then enter shutdown mode */ /* Queue a GRPC_OP_COMPLETED operation */ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, - void *tag, int success, + void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { @@ -228,15 +228,15 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, GPR_TIMER_BEGIN("grpc_cq_end_op", 0); GRPC_API_TRACE( - "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, success=%d, done=%p, " + "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, error=%p, done=%p, " "done_arg=%p, storage=%p)", - 7, (exec_ctx, cc, tag, success, done, done_arg, storage)); + 7, (exec_ctx, cc, tag, error, done, done_arg, storage)); storage->tag = tag; storage->done = done; storage->done_arg = done_arg; - storage->next = - ((uintptr_t)&cc->completed_head) | ((uintptr_t)(success != 0)); + storage->next = ((uintptr_t)&cc->completed_head) | + ((uintptr_t)(error == GRPC_ERROR_NONE)); gpr_mu_lock(cc->mu); #ifndef NDEBUG diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index f50ec54cea..d6dfa0f164 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -80,7 +80,8 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx, } else if (op->recv_trailing_metadata != NULL) { fill_metadata(elem, op->recv_trailing_metadata); } - grpc_transport_stream_op_finish_with_failure(exec_ctx, op); + grpc_transport_stream_op_finish_with_failure( + exec_ctx, op, GRPC_ERROR_CREATE("lame client channel")); } static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { @@ -94,13 +95,14 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE); *op->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE; op->on_connectivity_state_change->cb( - exec_ctx, op->on_connectivity_state_change->cb_arg, 1); + exec_ctx, op->on_connectivity_state_change->cb_arg, GRPC_ERROR_NONE); } if (op->on_consumed != NULL) { - op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 1); + op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, GRPC_ERROR_NONE); } if (op->send_ping != NULL) { - op->send_ping->cb(exec_ctx, op->send_ping->cb_arg, 0); + op->send_ping->cb(exec_ctx, op->send_ping->cb_arg, + GRPC_ERROR_CREATE("lame client channel")); } } diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 2db95b9cdf..61d1624f49 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -232,7 +232,8 @@ struct grpc_server { #define SERVER_FROM_CALL_ELEM(elem) \ (((channel_data *)(elem)->channel_data)->server) -static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *calld, bool success); +static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *calld, + grpc_error *error); static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, requested_call *rc); /* Before calling maybe_finish_shutdown, we must hold mu_global and not @@ -265,7 +266,7 @@ struct shutdown_cleanup_args { }; static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg, - bool iomgr_status_ignored) { + grpc_error *error) { struct shutdown_cleanup_args *a = arg; gpr_slice_unref(a->slice); gpr_free(a); @@ -320,7 +321,8 @@ static void request_matcher_destroy(request_matcher *rm) { gpr_stack_lockfree_destroy(rm->requests); } -static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, bool success) { +static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, + grpc_error *error) { grpc_call_destroy(grpc_call_from_top_element(elem)); } @@ -335,7 +337,8 @@ static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx, grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL); + grpc_exec_ctx_push(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, + NULL); } } @@ -398,7 +401,7 @@ static void orphan_channel(channel_data *chand) { } static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd, - bool success) { + grpc_error *error) { channel_data *chand = cd; grpc_server *server = chand->server; GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "server"); @@ -487,23 +490,24 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, grpc_call_stack_element(grpc_call_get_call_stack(call), 0); channel_data *chand = elem->channel_data; server_ref(chand->server); - grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, true, done_request_event, rc, - &rc->completion); + grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE, + done_request_event, rc, &rc->completion); } -static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { call_data *calld = arg; request_matcher *rm = calld->request_matcher; grpc_server *server = rm->server; - if (!success || gpr_atm_acq_load(&server->shutdown_flag)) { + if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) { gpr_mu_lock(&calld->mu_state); calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL); + grpc_exec_ctx_push(exec_ctx, &calld->kill_zombie_closure, error, NULL); return; } @@ -540,7 +544,8 @@ static void finish_start_new_rpc( calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL); + grpc_exec_ctx_push(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, + NULL); return; } @@ -548,7 +553,7 @@ static void finish_start_new_rpc( switch (payload_handling) { case GRPC_SRM_PAYLOAD_NONE: - publish_new_rpc(exec_ctx, calld, true); + publish_new_rpc(exec_ctx, calld, GRPC_ERROR_NONE); break; case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: { grpc_op op; @@ -676,7 +681,8 @@ static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, for (i = 0; i < server->num_shutdown_tags; i++) { server_ref(server); grpc_cq_end_op(exec_ctx, server->shutdown_tags[i].cq, - server->shutdown_tags[i].tag, 1, done_shutdown_event, server, + server->shutdown_tags[i].tag, GRPC_ERROR_NONE, + done_shutdown_event, server, &server->shutdown_tags[i].completion); } } @@ -699,7 +705,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { } static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, - bool success) { + grpc_error *error) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; gpr_timespec op_deadline; @@ -712,11 +718,12 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, if (calld->host && calld->path) { /* do nothing */ } else { - success = 0; + error = + GRPC_ERROR_CREATE_REFERENCING("Missing :authority or :path", &error, 1); } calld->on_done_recv_initial_metadata->cb( - exec_ctx, calld->on_done_recv_initial_metadata->cb_arg, success); + exec_ctx, calld->on_done_recv_initial_metadata->cb_arg, error); } static void server_mutate_op(grpc_call_element *elem, @@ -741,10 +748,10 @@ static void server_start_transport_stream_op(grpc_exec_ctx *exec_ctx, } static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, - bool success) { + grpc_error *error) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; - if (success) { + if (error == GRPC_ERROR_NONE) { start_new_rpc(exec_ctx, elem); } else { gpr_mu_lock(&calld->mu_state); @@ -752,7 +759,8 @@ static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL); + grpc_exec_ctx_push(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, + NULL); } else if (calld->state == PENDING) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); @@ -785,7 +793,7 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, } static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd, - bool iomgr_status_ignored) { + grpc_error *error) { channel_data *chand = cd; grpc_server *server = chand->server; if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) { @@ -1103,7 +1111,7 @@ void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg, } static void listener_destroy_done(grpc_exec_ctx *exec_ctx, void *s, - bool success) { + grpc_error *error) { grpc_server *server = s; gpr_mu_lock(&server->mu_global); server->listeners_destroyed++; @@ -1125,8 +1133,8 @@ void grpc_server_shutdown_and_notify(grpc_server *server, gpr_mu_lock(&server->mu_global); grpc_cq_begin_op(cq, tag); if (server->shutdown_published) { - grpc_cq_end_op(&exec_ctx, cq, tag, 1, done_published_shutdown, NULL, - gpr_malloc(sizeof(grpc_cq_completion))); + grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, + NULL, gpr_malloc(sizeof(grpc_cq_completion))); gpr_mu_unlock(&server->mu_global); goto done; } @@ -1258,8 +1266,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, - NULL); + grpc_exec_ctx_push(exec_ctx, &calld->kill_zombie_closure, + GRPC_ERROR_NONE, NULL); } else { GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c index e24ee638fd..cdfc298fa9 100644 --- a/src/core/lib/transport/connectivity_state.c +++ b/src/core/lib/transport/connectivity_state.c @@ -67,18 +67,18 @@ void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker) { - int success; + grpc_error *error; grpc_connectivity_state_watcher *w; while ((w = tracker->watchers)) { tracker->watchers = w->next; if (GRPC_CHANNEL_FATAL_FAILURE != *w->current) { *w->current = GRPC_CHANNEL_FATAL_FAILURE; - success = 1; + error = GRPC_ERROR_NONE; } else { - success = 0; + error = GRPC_ERROR_CREATE("Shutdown connectivity owner"); } - grpc_exec_ctx_enqueue(exec_ctx, w->notify, success, NULL); + grpc_exec_ctx_push(exec_ctx, w->notify, error, NULL); gpr_free(w); } gpr_free(tracker->name); @@ -109,7 +109,7 @@ int grpc_connectivity_state_notify_on_state_change( if (current == NULL) { grpc_connectivity_state_watcher *w = tracker->watchers; if (w != NULL && w->notify == notify) { - grpc_exec_ctx_enqueue(exec_ctx, notify, false, NULL); + grpc_exec_ctx_push(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL); tracker->watchers = w->next; gpr_free(w); return 0; @@ -117,7 +117,7 @@ int grpc_connectivity_state_notify_on_state_change( while (w != NULL) { grpc_connectivity_state_watcher *rm_candidate = w->next; if (rm_candidate != NULL && rm_candidate->notify == notify) { - grpc_exec_ctx_enqueue(exec_ctx, notify, false, NULL); + grpc_exec_ctx_push(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL); w->next = w->next->next; gpr_free(rm_candidate); return 0; @@ -128,7 +128,7 @@ int grpc_connectivity_state_notify_on_state_change( } else { if (tracker->current_state != *current) { *current = tracker->current_state; - grpc_exec_ctx_enqueue(exec_ctx, notify, true, NULL); + grpc_exec_ctx_push(exec_ctx, notify, GRPC_ERROR_NONE, NULL); } else { grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); w->current = current; @@ -158,7 +158,7 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, while ((w = tracker->watchers) != NULL) { *w->current = tracker->current_state; tracker->watchers = w->next; - grpc_exec_ctx_enqueue(exec_ctx, w->notify, true, NULL); + grpc_exec_ctx_push(exec_ctx, w->notify, GRPC_ERROR_NONE, NULL); gpr_free(w); } } diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index e6d524abe6..f7362973a9 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -60,7 +60,7 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount) { #endif if (gpr_unref(&refcount->refs)) { - grpc_exec_ctx_enqueue(exec_ctx, &refcount->destroy, true, NULL); + grpc_exec_ctx_push(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE, NULL); } } @@ -143,11 +143,14 @@ char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, return transport->vtable->get_peer(exec_ctx, transport); } -void grpc_transport_stream_op_finish_with_failure( - grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op) { - grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL); - grpc_exec_ctx_enqueue(exec_ctx, op->recv_initial_metadata_ready, false, NULL); - grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL); +void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, + grpc_transport_stream_op *op, + grpc_error *error) { + grpc_exec_ctx_push(exec_ctx, op->recv_message_ready, grpc_error_ref(error), + NULL); + grpc_exec_ctx_push(exec_ctx, op->recv_initial_metadata_ready, + grpc_error_ref(error), NULL); + grpc_exec_ctx_push(exec_ctx, op->on_complete, error, NULL); } void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, @@ -171,11 +174,11 @@ typedef struct { grpc_closure closure; } close_message_data; -static void free_message(grpc_exec_ctx *exec_ctx, void *p, bool iomgr_success) { +static void free_message(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { close_message_data *cmd = p; gpr_slice_unref(cmd->message); if (cmd->then_call != NULL) { - cmd->then_call->cb(exec_ctx, cmd->then_call->cb_arg, iomgr_success); + cmd->then_call->cb(exec_ctx, cmd->then_call->cb_arg, grpc_error_ref(error)); } gpr_free(cmd); } diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 482a9d1791..c4da1027cd 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -216,7 +216,8 @@ void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, grpc_stream *stream, void *and_free_memory); void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, - grpc_transport_stream_op *op); + grpc_transport_stream_op *op, + grpc_error *error); void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, grpc_status_code status); |