diff options
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/call.c | 164 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 2 | ||||
-rw-r--r-- | src/core/lib/surface/lame_client.c | 2 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 19 |
4 files changed, 114 insertions, 73 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 48a1e586e1..cc57654ea4 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -101,7 +101,18 @@ typedef struct { grpc_error *error; } received_status; -#define MAX_ERRORS_PER_BATCH 3 +static gpr_atm pack_received_status(received_status r) { + return r.is_set ? (1 | (gpr_atm)r.error) : 0; +} + +static received_status unpack_received_status(gpr_atm atm) { + return (atm & 1) == 0 + ? (received_status){.is_set = false, .error = GRPC_ERROR_NONE} + : (received_status){.is_set = true, + .error = (grpc_error *)(atm & ~(gpr_atm)1)}; +} + +#define MAX_ERRORS_PER_BATCH 4 typedef struct batch_control { grpc_call *call; @@ -142,8 +153,6 @@ struct grpc_call { bool destroy_called; /** flag indicating that cancellation is inherited */ bool cancellation_is_inherited; - /** bitmask of live batches */ - uint8_t used_batches; /** which ops are in-flight */ bool sent_initial_metadata; bool sending_message; @@ -165,8 +174,8 @@ struct grpc_call { Element 0 is initial metadata, element 1 is trailing metadata. */ grpc_metadata_array *buffered_metadata[2]; - /* Received call statuses from various sources */ - received_status status[STATUS_SOURCE_COUNT]; + /* Packed received call statuses from various sources */ + gpr_atm status[STATUS_SOURCE_COUNT]; /* Call data useful used for reporting. Only valid after the call has * completed */ @@ -245,7 +254,7 @@ static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call, static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl); static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl); static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl, - grpc_error *error); + grpc_error *error, bool has_cancelled); static void add_init_error(grpc_error **composite, grpc_error *new) { if (new == GRPC_ERROR_NONE) return; @@ -263,9 +272,8 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, grpc_channel_get_channel_stack(args->channel); grpc_call *call; GPR_TIMER_BEGIN("grpc_call_create", 0); - call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); + call = gpr_zalloc(sizeof(grpc_call) + channel_stack->call_stack_size); *out_call = call; - memset(call, 0, sizeof(grpc_call)); gpr_mu_init(&call->mu); call->channel = args->channel; call->cq = args->cq; @@ -446,7 +454,8 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - GRPC_ERROR_UNREF(c->status[i].error); + GRPC_ERROR_UNREF( + unpack_received_status(gpr_atm_no_barrier_load(&c->status[i])).error); } grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c); @@ -614,13 +623,12 @@ static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, */ static bool get_final_status_from( - grpc_call *call, status_source from_source, bool allow_ok_status, + grpc_call *call, grpc_error *error, bool allow_ok_status, void (*set_value)(grpc_status_code code, void *user_data), void *set_value_user_data, grpc_slice *details) { grpc_status_code code; const char *msg = NULL; - grpc_error_get_status(call->status[from_source].error, call->send_deadline, - &code, &msg, NULL); + grpc_error_get_status(error, call->send_deadline, &code, &msg, NULL); if (code == GRPC_STATUS_OK && !allow_ok_status) { return false; } @@ -638,12 +646,15 @@ static void get_final_status(grpc_call *call, void *user_data), void *set_value_user_data, grpc_slice *details) { int i; + received_status status[STATUS_SOURCE_COUNT]; + for (i = 0; i < STATUS_SOURCE_COUNT; i++) { + status[i] = unpack_received_status(gpr_atm_acq_load(&call->status[i])); + } if (grpc_call_error_trace) { gpr_log(GPR_DEBUG, "get_final_status %s", call->is_client ? "CLI" : "SVR"); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (call->status[i].is_set) { - gpr_log(GPR_DEBUG, " %d: %s", i, - grpc_error_string(call->status[i].error)); + if (status[i].is_set) { + gpr_log(GPR_DEBUG, " %d: %s", i, grpc_error_string(status[i].error)); } } } @@ -653,9 +664,9 @@ static void get_final_status(grpc_call *call, /* search for the best status we can present: ideally the error we use has a clearly defined grpc-status, and we'll prefer that. */ for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (call->status[i].is_set && - grpc_error_has_clear_grpc_status(call->status[i].error)) { - if (get_final_status_from(call, (status_source)i, allow_ok_status != 0, + if (status[i].is_set && + grpc_error_has_clear_grpc_status(status[i].error)) { + if (get_final_status_from(call, status[i].error, allow_ok_status != 0, set_value, set_value_user_data, details)) { return; } @@ -663,8 +674,8 @@ static void get_final_status(grpc_call *call, } /* If no clearly defined status exists, search for 'anything' */ for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (call->status[i].is_set) { - if (get_final_status_from(call, (status_source)i, allow_ok_status != 0, + if (status[i].is_set) { + if (get_final_status_from(call, status[i].error, allow_ok_status != 0, set_value, set_value_user_data, details)) { return; } @@ -681,12 +692,13 @@ static void get_final_status(grpc_call *call, static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call, status_source source, grpc_error *error) { - if (call->status[source].is_set) { + if (!gpr_atm_rel_cas(&call->status[source], + pack_received_status((received_status){ + .is_set = false, .error = GRPC_ERROR_NONE}), + pack_received_status((received_status){ + .is_set = true, .error = error}))) { GRPC_ERROR_UNREF(error); - return; } - call->status[source].is_set = true; - call->status[source].error = error; } /******************************************************************************* @@ -997,25 +1009,48 @@ static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) { return !(flags & invalid_positions); } -static batch_control *allocate_batch_control(grpc_call *call) { - size_t i; - for (i = 0; i < MAX_CONCURRENT_BATCHES; i++) { - if ((call->used_batches & (1 << i)) == 0) { - call->used_batches = (uint8_t)(call->used_batches | (uint8_t)(1 << i)); - return &call->active_batches[i]; - } +static int batch_slot_for_op(grpc_op_type type) { + switch (type) { + case GRPC_OP_SEND_INITIAL_METADATA: + return 0; + case GRPC_OP_SEND_MESSAGE: + return 1; + case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + case GRPC_OP_SEND_STATUS_FROM_SERVER: + return 2; + case GRPC_OP_RECV_INITIAL_METADATA: + return 3; + case GRPC_OP_RECV_MESSAGE: + return 4; + case GRPC_OP_RECV_CLOSE_ON_SERVER: + case GRPC_OP_RECV_STATUS_ON_CLIENT: + return 5; + } + GPR_UNREACHABLE_CODE(return 123456789); +} + +static batch_control *allocate_batch_control(grpc_call *call, + const grpc_op *ops, + size_t num_ops) { + int slot = batch_slot_for_op(ops[0].op); + for (size_t i = 1; i < num_ops; i++) { + int op_slot = batch_slot_for_op(ops[i].op); + slot = GPR_MIN(slot, op_slot); + } + batch_control *bctl = &call->active_batches[slot]; + if (bctl->call != NULL) { + return NULL; } - return NULL; + memset(bctl, 0, sizeof(*bctl)); + bctl->call = call; + return bctl; } static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data, grpc_cq_completion *storage) { batch_control *bctl = user_data; grpc_call *call = bctl->call; - gpr_mu_lock(&call->mu); - call->used_batches = (uint8_t)( - call->used_batches & ~(uint8_t)(1 << (bctl - call->active_batches))); - gpr_mu_unlock(&call->mu); + bctl->call = NULL; GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); } @@ -1098,12 +1133,8 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, if (bctl->is_notify_tag_closure) { /* unrefs bctl->error */ + bctl->call = NULL; grpc_closure_run(exec_ctx, bctl->notify_tag, error); - gpr_mu_lock(&call->mu); - bctl->call->used_batches = - (uint8_t)(bctl->call->used_batches & - ~(uint8_t)(1 << (bctl - bctl->call->active_batches))); - gpr_mu_unlock(&call->mu); GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); } else { /* unrefs bctl->error */ @@ -1191,6 +1222,11 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_call *call = bctl->call; gpr_mu_lock(&bctl->call->mu); if (error != GRPC_ERROR_NONE) { + if (call->receiving_stream != NULL) { + grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); + call->receiving_stream = NULL; + } + add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), true); cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); } @@ -1257,10 +1293,10 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, } static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl, - grpc_error *error) { + grpc_error *error, bool has_cancelled) { if (error == GRPC_ERROR_NONE) return; int idx = (int)gpr_atm_no_barrier_fetch_add(&bctl->num_errors, 1); - if (idx == 0) { + if (idx == 0 && !has_cancelled) { cancel_with_error(exec_ctx, bctl->call, STATUS_FROM_CORE, GRPC_ERROR_REF(error)); } @@ -1274,7 +1310,7 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&call->mu); - add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error)); + add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false); if (error == GRPC_ERROR_NONE) { grpc_metadata_batch *md = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; @@ -1311,10 +1347,15 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; - add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error)); + add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false); finish_batch_step(exec_ctx, bctl); } +static void free_no_op_completion(grpc_exec_ctx *exec_ctx, void *p, + grpc_cq_completion *completion) { + gpr_free(completion); +} + static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, const grpc_op *ops, size_t nops, void *notify_tag, @@ -1329,32 +1370,33 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_metadata compression_md; GPR_TIMER_BEGIN("grpc_call_start_batch", 0); - GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag); - /* TODO(ctiller): this feels like it could be made lock-free */ - gpr_mu_lock(&call->mu); - bctl = allocate_batch_control(call); - memset(bctl, 0, sizeof(*bctl)); - bctl->call = call; - bctl->notify_tag = notify_tag; - bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0); - - grpc_transport_stream_op *stream_op = &bctl->op; - memset(stream_op, 0, sizeof(*stream_op)); - stream_op->covered_by_poller = true; - if (nops == 0) { - GRPC_CALL_INTERNAL_REF(call, "completion"); if (!is_notify_tag_closure) { grpc_cq_begin_op(call->cq, notify_tag); + grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE, + free_no_op_completion, NULL, + gpr_malloc(sizeof(grpc_cq_completion))); + } else { + grpc_closure_sched(exec_ctx, notify_tag, GRPC_ERROR_NONE); } - gpr_mu_unlock(&call->mu); - post_batch_completion(exec_ctx, bctl); error = GRPC_CALL_OK; goto done; } + bctl = allocate_batch_control(call, ops, nops); + if (bctl == NULL) { + return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; + } + bctl->notify_tag = notify_tag; + bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0); + + gpr_mu_lock(&call->mu); + grpc_transport_stream_op *stream_op = &bctl->op; + memset(stream_op, 0, sizeof(*stream_op)); + stream_op->covered_by_poller = true; + /* rewrite batch ops into a transport op */ for (i = 0; i < nops; i++) { op = &ops[i]; diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index b80656aa8d..b4594817e4 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -118,7 +118,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { GRPC_API_TRACE("grpc_completion_queue_create(reserved=%p)", 1, (reserved)); - cc = gpr_malloc(sizeof(grpc_completion_queue) + grpc_pollset_size()); + cc = gpr_zalloc(sizeof(grpc_completion_queue) + grpc_pollset_size()); grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu); #ifndef NDEBUG cc->outstanding_tags = NULL; diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index 48de0e1d5b..49bc4c114b 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -122,7 +122,7 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { + const grpc_call_element_args *args) { call_data *calld = elem->call_data; gpr_atm_no_barrier_store(&calld->filled_metadata, 0); return GRPC_ERROR_NONE; diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 6ab1c0d94d..b360579553 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -540,7 +540,8 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), grpc_schedule_on_exec_ctx); - grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, error); + grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, + GRPC_ERROR_REF(error)); return; } @@ -879,7 +880,7 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd, static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { + const grpc_call_element_args *args) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; memset(calld, 0, sizeof(call_data)); @@ -1015,12 +1016,10 @@ void grpc_server_register_non_listening_completion_queue( grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved)); - grpc_server *server = gpr_malloc(sizeof(grpc_server)); + grpc_server *server = gpr_zalloc(sizeof(grpc_server)); GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); - memset(server, 0, sizeof(grpc_server)); - gpr_mu_init(&server->mu_global); gpr_mu_init(&server->mu_call); @@ -1069,8 +1068,7 @@ void *grpc_server_register_method( flags); return NULL; } - m = gpr_malloc(sizeof(registered_method)); - memset(m, 0, sizeof(*m)); + m = gpr_zalloc(sizeof(registered_method)); m->method = gpr_strdup(method); m->host = gpr_strdup(host); m->next = server->registered_methods; @@ -1174,8 +1172,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, if (num_registered_methods > 0) { slots = 2 * num_registered_methods; alloc = sizeof(channel_registered_method) * slots; - chand->registered_methods = gpr_malloc(alloc); - memset(chand->registered_methods, 0, alloc); + chand->registered_methods = gpr_zalloc(alloc); for (rm = s->registered_methods; rm; rm = rm->next) { grpc_slice host; bool has_host; @@ -1198,7 +1195,9 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, crm->server_registered_method = rm; crm->flags = rm->flags; crm->has_host = has_host; - crm->host = host; + if (has_host) { + crm->host = host; + } crm->method = method; } GPR_ASSERT(slots <= UINT32_MAX); |