aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/call.c164
-rw-r--r--src/core/lib/surface/completion_queue.c2
-rw-r--r--src/core/lib/surface/lame_client.c2
-rw-r--r--src/core/lib/surface/server.c19
4 files changed, 114 insertions, 73 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 48a1e586e1..cc57654ea4 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -101,7 +101,18 @@ typedef struct {
grpc_error *error;
} received_status;
-#define MAX_ERRORS_PER_BATCH 3
+static gpr_atm pack_received_status(received_status r) {
+ return r.is_set ? (1 | (gpr_atm)r.error) : 0;
+}
+
+static received_status unpack_received_status(gpr_atm atm) {
+ return (atm & 1) == 0
+ ? (received_status){.is_set = false, .error = GRPC_ERROR_NONE}
+ : (received_status){.is_set = true,
+ .error = (grpc_error *)(atm & ~(gpr_atm)1)};
+}
+
+#define MAX_ERRORS_PER_BATCH 4
typedef struct batch_control {
grpc_call *call;
@@ -142,8 +153,6 @@ struct grpc_call {
bool destroy_called;
/** flag indicating that cancellation is inherited */
bool cancellation_is_inherited;
- /** bitmask of live batches */
- uint8_t used_batches;
/** which ops are in-flight */
bool sent_initial_metadata;
bool sending_message;
@@ -165,8 +174,8 @@ struct grpc_call {
Element 0 is initial metadata, element 1 is trailing metadata. */
grpc_metadata_array *buffered_metadata[2];
- /* Received call statuses from various sources */
- received_status status[STATUS_SOURCE_COUNT];
+ /* Packed received call statuses from various sources */
+ gpr_atm status[STATUS_SOURCE_COUNT];
/* Call data useful used for reporting. Only valid after the call has
* completed */
@@ -245,7 +254,7 @@ static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call,
static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl);
static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl);
static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
- grpc_error *error);
+ grpc_error *error, bool has_cancelled);
static void add_init_error(grpc_error **composite, grpc_error *new) {
if (new == GRPC_ERROR_NONE) return;
@@ -263,9 +272,8 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
grpc_channel_get_channel_stack(args->channel);
grpc_call *call;
GPR_TIMER_BEGIN("grpc_call_create", 0);
- call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
+ call = gpr_zalloc(sizeof(grpc_call) + channel_stack->call_stack_size);
*out_call = call;
- memset(call, 0, sizeof(grpc_call));
gpr_mu_init(&call->mu);
call->channel = args->channel;
call->cq = args->cq;
@@ -446,7 +454,8 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- GRPC_ERROR_UNREF(c->status[i].error);
+ GRPC_ERROR_UNREF(
+ unpack_received_status(gpr_atm_no_barrier_load(&c->status[i])).error);
}
grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c);
@@ -614,13 +623,12 @@ static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
*/
static bool get_final_status_from(
- grpc_call *call, status_source from_source, bool allow_ok_status,
+ grpc_call *call, grpc_error *error, bool allow_ok_status,
void (*set_value)(grpc_status_code code, void *user_data),
void *set_value_user_data, grpc_slice *details) {
grpc_status_code code;
const char *msg = NULL;
- grpc_error_get_status(call->status[from_source].error, call->send_deadline,
- &code, &msg, NULL);
+ grpc_error_get_status(error, call->send_deadline, &code, &msg, NULL);
if (code == GRPC_STATUS_OK && !allow_ok_status) {
return false;
}
@@ -638,12 +646,15 @@ static void get_final_status(grpc_call *call,
void *user_data),
void *set_value_user_data, grpc_slice *details) {
int i;
+ received_status status[STATUS_SOURCE_COUNT];
+ for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
+ status[i] = unpack_received_status(gpr_atm_acq_load(&call->status[i]));
+ }
if (grpc_call_error_trace) {
gpr_log(GPR_DEBUG, "get_final_status %s", call->is_client ? "CLI" : "SVR");
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- if (call->status[i].is_set) {
- gpr_log(GPR_DEBUG, " %d: %s", i,
- grpc_error_string(call->status[i].error));
+ if (status[i].is_set) {
+ gpr_log(GPR_DEBUG, " %d: %s", i, grpc_error_string(status[i].error));
}
}
}
@@ -653,9 +664,9 @@ static void get_final_status(grpc_call *call,
/* search for the best status we can present: ideally the error we use has a
clearly defined grpc-status, and we'll prefer that. */
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- if (call->status[i].is_set &&
- grpc_error_has_clear_grpc_status(call->status[i].error)) {
- if (get_final_status_from(call, (status_source)i, allow_ok_status != 0,
+ if (status[i].is_set &&
+ grpc_error_has_clear_grpc_status(status[i].error)) {
+ if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
set_value, set_value_user_data, details)) {
return;
}
@@ -663,8 +674,8 @@ static void get_final_status(grpc_call *call,
}
/* If no clearly defined status exists, search for 'anything' */
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- if (call->status[i].is_set) {
- if (get_final_status_from(call, (status_source)i, allow_ok_status != 0,
+ if (status[i].is_set) {
+ if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
set_value, set_value_user_data, details)) {
return;
}
@@ -681,12 +692,13 @@ static void get_final_status(grpc_call *call,
static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call,
status_source source, grpc_error *error) {
- if (call->status[source].is_set) {
+ if (!gpr_atm_rel_cas(&call->status[source],
+ pack_received_status((received_status){
+ .is_set = false, .error = GRPC_ERROR_NONE}),
+ pack_received_status((received_status){
+ .is_set = true, .error = error}))) {
GRPC_ERROR_UNREF(error);
- return;
}
- call->status[source].is_set = true;
- call->status[source].error = error;
}
/*******************************************************************************
@@ -997,25 +1009,48 @@ static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
return !(flags & invalid_positions);
}
-static batch_control *allocate_batch_control(grpc_call *call) {
- size_t i;
- for (i = 0; i < MAX_CONCURRENT_BATCHES; i++) {
- if ((call->used_batches & (1 << i)) == 0) {
- call->used_batches = (uint8_t)(call->used_batches | (uint8_t)(1 << i));
- return &call->active_batches[i];
- }
+static int batch_slot_for_op(grpc_op_type type) {
+ switch (type) {
+ case GRPC_OP_SEND_INITIAL_METADATA:
+ return 0;
+ case GRPC_OP_SEND_MESSAGE:
+ return 1;
+ case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+ case GRPC_OP_SEND_STATUS_FROM_SERVER:
+ return 2;
+ case GRPC_OP_RECV_INITIAL_METADATA:
+ return 3;
+ case GRPC_OP_RECV_MESSAGE:
+ return 4;
+ case GRPC_OP_RECV_CLOSE_ON_SERVER:
+ case GRPC_OP_RECV_STATUS_ON_CLIENT:
+ return 5;
+ }
+ GPR_UNREACHABLE_CODE(return 123456789);
+}
+
+static batch_control *allocate_batch_control(grpc_call *call,
+ const grpc_op *ops,
+ size_t num_ops) {
+ int slot = batch_slot_for_op(ops[0].op);
+ for (size_t i = 1; i < num_ops; i++) {
+ int op_slot = batch_slot_for_op(ops[i].op);
+ slot = GPR_MIN(slot, op_slot);
+ }
+ batch_control *bctl = &call->active_batches[slot];
+ if (bctl->call != NULL) {
+ return NULL;
}
- return NULL;
+ memset(bctl, 0, sizeof(*bctl));
+ bctl->call = call;
+ return bctl;
}
static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_cq_completion *storage) {
batch_control *bctl = user_data;
grpc_call *call = bctl->call;
- gpr_mu_lock(&call->mu);
- call->used_batches = (uint8_t)(
- call->used_batches & ~(uint8_t)(1 << (bctl - call->active_batches)));
- gpr_mu_unlock(&call->mu);
+ bctl->call = NULL;
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
}
@@ -1098,12 +1133,8 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
if (bctl->is_notify_tag_closure) {
/* unrefs bctl->error */
+ bctl->call = NULL;
grpc_closure_run(exec_ctx, bctl->notify_tag, error);
- gpr_mu_lock(&call->mu);
- bctl->call->used_batches =
- (uint8_t)(bctl->call->used_batches &
- ~(uint8_t)(1 << (bctl - bctl->call->active_batches)));
- gpr_mu_unlock(&call->mu);
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
} else {
/* unrefs bctl->error */
@@ -1191,6 +1222,11 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_call *call = bctl->call;
gpr_mu_lock(&bctl->call->mu);
if (error != GRPC_ERROR_NONE) {
+ if (call->receiving_stream != NULL) {
+ grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
+ call->receiving_stream = NULL;
+ }
+ add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), true);
cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
GRPC_ERROR_REF(error));
}
@@ -1257,10 +1293,10 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
}
static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
- grpc_error *error) {
+ grpc_error *error, bool has_cancelled) {
if (error == GRPC_ERROR_NONE) return;
int idx = (int)gpr_atm_no_barrier_fetch_add(&bctl->num_errors, 1);
- if (idx == 0) {
+ if (idx == 0 && !has_cancelled) {
cancel_with_error(exec_ctx, bctl->call, STATUS_FROM_CORE,
GRPC_ERROR_REF(error));
}
@@ -1274,7 +1310,7 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&call->mu);
- add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error));
+ add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch *md =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
@@ -1311,10 +1347,15 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) {
batch_control *bctl = bctlp;
- add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error));
+ add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
finish_batch_step(exec_ctx, bctl);
}
+static void free_no_op_completion(grpc_exec_ctx *exec_ctx, void *p,
+ grpc_cq_completion *completion) {
+ gpr_free(completion);
+}
+
static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_call *call, const grpc_op *ops,
size_t nops, void *notify_tag,
@@ -1329,32 +1370,33 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_metadata compression_md;
GPR_TIMER_BEGIN("grpc_call_start_batch", 0);
-
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
- /* TODO(ctiller): this feels like it could be made lock-free */
- gpr_mu_lock(&call->mu);
- bctl = allocate_batch_control(call);
- memset(bctl, 0, sizeof(*bctl));
- bctl->call = call;
- bctl->notify_tag = notify_tag;
- bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0);
-
- grpc_transport_stream_op *stream_op = &bctl->op;
- memset(stream_op, 0, sizeof(*stream_op));
- stream_op->covered_by_poller = true;
-
if (nops == 0) {
- GRPC_CALL_INTERNAL_REF(call, "completion");
if (!is_notify_tag_closure) {
grpc_cq_begin_op(call->cq, notify_tag);
+ grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE,
+ free_no_op_completion, NULL,
+ gpr_malloc(sizeof(grpc_cq_completion)));
+ } else {
+ grpc_closure_sched(exec_ctx, notify_tag, GRPC_ERROR_NONE);
}
- gpr_mu_unlock(&call->mu);
- post_batch_completion(exec_ctx, bctl);
error = GRPC_CALL_OK;
goto done;
}
+ bctl = allocate_batch_control(call, ops, nops);
+ if (bctl == NULL) {
+ return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
+ }
+ bctl->notify_tag = notify_tag;
+ bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0);
+
+ gpr_mu_lock(&call->mu);
+ grpc_transport_stream_op *stream_op = &bctl->op;
+ memset(stream_op, 0, sizeof(*stream_op));
+ stream_op->covered_by_poller = true;
+
/* rewrite batch ops into a transport op */
for (i = 0; i < nops; i++) {
op = &ops[i];
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index b80656aa8d..b4594817e4 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -118,7 +118,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
GRPC_API_TRACE("grpc_completion_queue_create(reserved=%p)", 1, (reserved));
- cc = gpr_malloc(sizeof(grpc_completion_queue) + grpc_pollset_size());
+ cc = gpr_zalloc(sizeof(grpc_completion_queue) + grpc_pollset_size());
grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu);
#ifndef NDEBUG
cc->outstanding_tags = NULL;
diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c
index 48de0e1d5b..49bc4c114b 100644
--- a/src/core/lib/surface/lame_client.c
+++ b/src/core/lib/surface/lame_client.c
@@ -122,7 +122,7 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_call_element_args *args) {
+ const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
gpr_atm_no_barrier_store(&calld->filled_metadata, 0);
return GRPC_ERROR_NONE;
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 6ab1c0d94d..b360579553 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -540,7 +540,8 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
grpc_schedule_on_exec_ctx);
- grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, error);
+ grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure,
+ GRPC_ERROR_REF(error));
return;
}
@@ -879,7 +880,7 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd,
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_call_element_args *args) {
+ const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
memset(calld, 0, sizeof(call_data));
@@ -1015,12 +1016,10 @@ void grpc_server_register_non_listening_completion_queue(
grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
- grpc_server *server = gpr_malloc(sizeof(grpc_server));
+ grpc_server *server = gpr_zalloc(sizeof(grpc_server));
GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
- memset(server, 0, sizeof(grpc_server));
-
gpr_mu_init(&server->mu_global);
gpr_mu_init(&server->mu_call);
@@ -1069,8 +1068,7 @@ void *grpc_server_register_method(
flags);
return NULL;
}
- m = gpr_malloc(sizeof(registered_method));
- memset(m, 0, sizeof(*m));
+ m = gpr_zalloc(sizeof(registered_method));
m->method = gpr_strdup(method);
m->host = gpr_strdup(host);
m->next = server->registered_methods;
@@ -1174,8 +1172,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
if (num_registered_methods > 0) {
slots = 2 * num_registered_methods;
alloc = sizeof(channel_registered_method) * slots;
- chand->registered_methods = gpr_malloc(alloc);
- memset(chand->registered_methods, 0, alloc);
+ chand->registered_methods = gpr_zalloc(alloc);
for (rm = s->registered_methods; rm; rm = rm->next) {
grpc_slice host;
bool has_host;
@@ -1198,7 +1195,9 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
crm->server_registered_method = rm;
crm->flags = rm->flags;
crm->has_host = has_host;
- crm->host = host;
+ if (has_host) {
+ crm->host = host;
+ }
crm->method = method;
}
GPR_ASSERT(slots <= UINT32_MAX);