aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/call.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-02-08 16:39:16 -0800
committerGravatar Craig Tiller <ctiller@google.com>2017-02-08 16:39:16 -0800
commit89d33790d168740bad8fafdaf29577f8c28cb5c8 (patch)
treef157820b725d660d14f7caace35b1e072ed9d609 /src/core/lib/surface/call.c
parente5025380cf4cda0b1ede29c03dd3581ab44d3267 (diff)
Begin removing used_batches from call, allowing the removal of a set of mutex acquisitions
Diffstat (limited to 'src/core/lib/surface/call.c')
-rw-r--r--src/core/lib/surface/call.c61
1 files changed, 28 insertions, 33 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 3352e427cd..c7631c7d32 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -142,8 +142,6 @@ struct grpc_call {
bool destroy_called;
/** flag indicating that cancellation is inherited */
bool cancellation_is_inherited;
- /** bitmask of live batches */
- uint8_t used_batches;
/** which ops are in-flight */
bool sent_initial_metadata;
bool sending_message;
@@ -991,25 +989,23 @@ static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
return !(flags & invalid_positions);
}
-static batch_control *allocate_batch_control(grpc_call *call) {
- size_t i;
- for (i = 0; i < MAX_CONCURRENT_BATCHES; i++) {
- if ((call->used_batches & (1 << i)) == 0) {
- call->used_batches = (uint8_t)(call->used_batches | (uint8_t)(1 << i));
- return &call->active_batches[i];
- }
+static int batch_slot_for_op(grpc_op_type type) { return (int)type; }
+
+static batch_control *allocate_batch_control(grpc_call *call,
+ const grpc_op *ops,
+ size_t num_ops) {
+ int slot = batch_slot_for_op(ops[0].op);
+ for (size_t i = 1; i < num_ops; i++) {
+ int op_slot = batch_slot_for_op(ops[i].op);
+ slot = GPR_MIN(slot, op_slot);
}
- return NULL;
+ return &call->active_batches[slot];
}
static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_cq_completion *storage) {
batch_control *bctl = user_data;
grpc_call *call = bctl->call;
- gpr_mu_lock(&call->mu);
- call->used_batches = (uint8_t)(
- call->used_batches & ~(uint8_t)(1 << (bctl - call->active_batches)));
- gpr_mu_unlock(&call->mu);
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
}
@@ -1093,11 +1089,6 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
if (bctl->is_notify_tag_closure) {
/* unrefs bctl->error */
grpc_closure_run(exec_ctx, bctl->notify_tag, error);
- gpr_mu_lock(&call->mu);
- bctl->call->used_batches =
- (uint8_t)(bctl->call->used_batches &
- ~(uint8_t)(1 << (bctl - bctl->call->active_batches)));
- gpr_mu_unlock(&call->mu);
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
} else {
/* unrefs bctl->error */
@@ -1309,6 +1300,11 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
finish_batch_step(exec_ctx, bctl);
}
+static void free_no_op_completion(grpc_exec_ctx *exec_ctx, void *p,
+ grpc_cq_completion *completion) {
+ gpr_free(completion);
+}
+
static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_call *call, const grpc_op *ops,
size_t nops, void *notify_tag,
@@ -1323,32 +1319,31 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_metadata compression_md;
GPR_TIMER_BEGIN("grpc_call_start_batch", 0);
-
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
+ if (nops == 0) {
+ if (!is_notify_tag_closure) {
+ grpc_cq_begin_op(call->cq, notify_tag);
+ grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE,
+ free_no_op_completion, NULL,
+ gpr_malloc(sizeof(grpc_cq_completion)));
+ }
+ error = GRPC_CALL_OK;
+ goto done;
+ }
+
/* TODO(ctiller): this feels like it could be made lock-free */
- gpr_mu_lock(&call->mu);
- bctl = allocate_batch_control(call);
+ bctl = allocate_batch_control(call, ops, nops);
memset(bctl, 0, sizeof(*bctl));
bctl->call = call;
bctl->notify_tag = notify_tag;
bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0);
+ gpr_mu_lock(&call->mu);
grpc_transport_stream_op *stream_op = &bctl->op;
memset(stream_op, 0, sizeof(*stream_op));
stream_op->covered_by_poller = true;
- if (nops == 0) {
- GRPC_CALL_INTERNAL_REF(call, "completion");
- if (!is_notify_tag_closure) {
- grpc_cq_begin_op(call->cq, notify_tag);
- }
- gpr_mu_unlock(&call->mu);
- post_batch_completion(exec_ctx, bctl);
- error = GRPC_CALL_OK;
- goto done;
- }
-
/* rewrite batch ops into a transport op */
for (i = 0; i < nops; i++) {
op = &ops[i];