aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/call.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface/call.c')
-rw-r--r--src/core/lib/surface/call.c197
1 files changed, 89 insertions, 108 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index c2547c5147..895a8a3b06 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -51,6 +51,7 @@
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
+#include "src/core/lib/support/arena.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h"
@@ -138,14 +139,15 @@ typedef struct batch_control {
} batch_control;
struct grpc_call {
+ gpr_arena *arena;
grpc_completion_queue *cq;
grpc_polling_entity pollent;
grpc_channel *channel;
grpc_call *parent;
grpc_call *first_child;
gpr_timespec start_time;
- /* TODO(ctiller): share with cq if possible? */
- gpr_mu mu;
+ /* protects first_child, and child next/prev links */
+ gpr_mu child_list_mu;
/* client or server call */
bool is_client;
@@ -160,8 +162,8 @@ struct grpc_call {
bool received_initial_metadata;
bool receiving_message;
bool requested_final_op;
- bool received_final_op;
- bool sent_any_op;
+ gpr_atm any_ops_sent_atm;
+ gpr_atm received_final_op_atm;
/* have we received initial metadata */
bool has_initial_md_been_received;
@@ -212,6 +214,8 @@ struct grpc_call {
grpc_closure receiving_initial_metadata_ready;
uint32_t test_only_last_message_flags;
+ grpc_closure release_call;
+
union {
struct {
grpc_status_code *status;
@@ -260,7 +264,7 @@ static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
static void add_init_error(grpc_error **composite, grpc_error *new) {
if (new == GRPC_ERROR_NONE) return;
if (*composite == GRPC_ERROR_NONE)
- *composite = GRPC_ERROR_CREATE("Call creation failed");
+ *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Call creation failed");
*composite = grpc_error_add_child(*composite, new);
}
@@ -273,9 +277,13 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
grpc_channel_get_channel_stack(args->channel);
grpc_call *call;
GPR_TIMER_BEGIN("grpc_call_create", 0);
- call = gpr_zalloc(sizeof(grpc_call) + channel_stack->call_stack_size);
+ gpr_arena *arena =
+ gpr_arena_create(grpc_channel_get_call_size_estimate(args->channel));
+ call = gpr_arena_alloc(arena,
+ sizeof(grpc_call) + channel_stack->call_stack_size);
+ call->arena = arena;
*out_call = call;
- gpr_mu_init(&call->mu);
+ gpr_mu_init(&call->child_list_mu);
call->channel = args->channel;
call->cq = args->cq;
call->parent = args->parent_call;
@@ -313,7 +321,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(call->is_client);
GPR_ASSERT(!args->parent_call->is_client);
- gpr_mu_lock(&args->parent_call->mu);
+ gpr_mu_lock(&args->parent_call->child_list_mu);
if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
send_deadline = gpr_time_min(
@@ -327,20 +335,24 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
* call. */
if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
if (0 == (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) {
- add_init_error(&error,
- GRPC_ERROR_CREATE("Census tracing propagation requested "
- "without Census context propagation"));
+ add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Census tracing propagation requested "
+ "without Census context propagation"));
}
grpc_call_context_set(
call, GRPC_CONTEXT_TRACING,
args->parent_call->context[GRPC_CONTEXT_TRACING].value, NULL);
} else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
- add_init_error(&error,
- GRPC_ERROR_CREATE("Census context propagation requested "
- "without Census tracing propagation"));
+ add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Census context propagation requested "
+ "without Census tracing propagation"));
}
if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
call->cancellation_is_inherited = 1;
+ if (gpr_atm_acq_load(&args->parent_call->received_final_op_atm)) {
+ cancel_with_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE,
+ GRPC_ERROR_CANCELLED);
+ }
}
if (args->parent_call->first_child == NULL) {
@@ -353,18 +365,23 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
call;
}
- gpr_mu_unlock(&args->parent_call->mu);
+ gpr_mu_unlock(&args->parent_call->child_list_mu);
}
call->send_deadline = send_deadline;
GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
/* initial refcount dropped by grpc_call_destroy */
+ grpc_call_element_args call_args = {
+ .call_stack = CALL_STACK_FROM_CALL(call),
+ .server_transport_data = args->server_transport_data,
+ .context = call->context,
+ .path = path,
+ .start_time = call->start_time,
+ .deadline = send_deadline,
+ .arena = call->arena};
add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1,
- destroy_call, call, call->context,
- args->server_transport_data, path,
- call->start_time, send_deadline,
- CALL_STACK_FROM_CALL(call)));
+ destroy_call, call, &call_args));
if (error != GRPC_ERROR_NONE) {
cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
GRPC_ERROR_REF(error));
@@ -421,6 +438,14 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) {
GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON);
}
+static void release_call(grpc_exec_ctx *exec_ctx, void *call,
+ grpc_error *error) {
+ grpc_call *c = call;
+ grpc_channel *channel = c->channel;
+ grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena));
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
+}
+
static void set_status_value_directly(grpc_status_code status, void *dest);
static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
grpc_error *error) {
@@ -435,7 +460,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
if (c->receiving_stream != NULL) {
grpc_byte_stream_destroy(exec_ctx, c->receiving_stream);
}
- gpr_mu_destroy(&c->mu);
+ gpr_mu_destroy(&c->child_list_mu);
for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
GRPC_MDELEM_UNREF(exec_ctx, c->send_extra_metadata[ii].md);
}
@@ -447,7 +472,6 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
if (c->cq) {
GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
}
- grpc_channel *channel = c->channel;
get_final_status(call, set_status_value_directly, &c->final_info.final_status,
NULL);
@@ -456,11 +480,12 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
GRPC_ERROR_UNREF(
- unpack_received_status(gpr_atm_no_barrier_load(&c->status[i])).error);
+ unpack_received_status(gpr_atm_acq_load(&c->status[i])).error);
}
- grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c);
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
+ grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info,
+ grpc_closure_init(&c->release_call, release_call, c,
+ grpc_schedule_on_exec_ctx));
GPR_TIMER_END("destroy_call", 0);
}
@@ -473,7 +498,7 @@ void grpc_call_destroy(grpc_call *c) {
GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c));
if (parent) {
- gpr_mu_lock(&parent->mu);
+ gpr_mu_lock(&parent->child_list_mu);
if (c == parent->first_child) {
parent->first_child = c->sibling_next;
if (c == parent->first_child) {
@@ -482,15 +507,14 @@ void grpc_call_destroy(grpc_call *c) {
c->sibling_prev->sibling_next = c->sibling_next;
c->sibling_next->sibling_prev = c->sibling_prev;
}
- gpr_mu_unlock(&parent->mu);
+ gpr_mu_unlock(&parent->child_list_mu);
GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child");
}
- gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->destroy_called);
c->destroy_called = 1;
- cancel = c->sent_any_op && !c->received_final_op;
- gpr_mu_unlock(&c->mu);
+ cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) &&
+ !gpr_atm_acq_load(&c->received_final_op_atm);
if (cancel) {
cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE,
GRPC_ERROR_CANCELLED);
@@ -555,60 +579,33 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
"c=%p, status=%d, description=%s, reserved=%p)",
4, (c, (int)status, description, reserved));
GPR_ASSERT(reserved == NULL);
- gpr_mu_lock(&c->mu);
cancel_with_status(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, status,
description);
- gpr_mu_unlock(&c->mu);
grpc_exec_ctx_finish(&exec_ctx);
return GRPC_CALL_OK;
}
-typedef struct termination_closure {
- grpc_closure closure;
- grpc_call *call;
- grpc_transport_stream_op op;
-} termination_closure;
-
-static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp,
- grpc_error *error) {
- termination_closure *tc = tcp;
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "termination");
- gpr_free(tc);
-}
-
-static void send_termination(grpc_exec_ctx *exec_ctx, void *tcp,
+static void done_termination(grpc_exec_ctx *exec_ctx, void *call,
grpc_error *error) {
- termination_closure *tc = tcp;
- memset(&tc->op, 0, sizeof(tc->op));
- tc->op.cancel_error = GRPC_ERROR_REF(error);
- /* reuse closure to catch completion */
- tc->op.on_complete = grpc_closure_init(&tc->closure, done_termination, tc,
- grpc_schedule_on_exec_ctx);
- execute_op(exec_ctx, tc->call, &tc->op);
-}
-
-static void terminate_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
- grpc_error *error) {
- termination_closure *tc = gpr_malloc(sizeof(*tc));
- memset(tc, 0, sizeof(*tc));
- tc->call = c;
- GRPC_CALL_INTERNAL_REF(tc->call, "termination");
- grpc_closure_sched(exec_ctx, grpc_closure_init(&tc->closure, send_termination,
- tc, grpc_schedule_on_exec_ctx),
- error);
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "termination");
}
static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_error *error) {
+ GRPC_CALL_INTERNAL_REF(c, "termination");
set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error));
- terminate_with_error(exec_ctx, c, error);
+ grpc_transport_stream_op *op = grpc_make_transport_stream_op(
+ grpc_closure_create(done_termination, c, grpc_schedule_on_exec_ctx));
+ op->cancel_error = error;
+ execute_op(exec_ctx, c, op);
}
static grpc_error *error_from_status(grpc_status_code status,
const char *description) {
return grpc_error_set_int(
- grpc_error_set_str(GRPC_ERROR_CREATE(description),
- GRPC_ERROR_STR_GRPC_MESSAGE, description),
+ grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description),
+ GRPC_ERROR_STR_GRPC_MESSAGE,
+ grpc_slice_from_copied_string(description)),
GRPC_ERROR_INT_GRPC_STATUS, status);
}
@@ -628,16 +625,15 @@ static bool get_final_status_from(
void (*set_value)(grpc_status_code code, void *user_data),
void *set_value_user_data, grpc_slice *details) {
grpc_status_code code;
- const char *msg = NULL;
- grpc_error_get_status(error, call->send_deadline, &code, &msg, NULL);
+ grpc_slice slice;
+ grpc_error_get_status(error, call->send_deadline, &code, &slice, NULL);
if (code == GRPC_STATUS_OK && !allow_ok_status) {
return false;
}
set_value(code, set_value_user_data);
if (details != NULL) {
- *details =
- msg == NULL ? grpc_empty_slice() : grpc_slice_from_copied_string(msg);
+ *details = grpc_slice_ref_internal(slice);
}
return true;
}
@@ -715,9 +711,7 @@ static void set_incoming_compression_algorithm(
grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
grpc_call *call) {
grpc_compression_algorithm algorithm;
- gpr_mu_lock(&call->mu);
algorithm = call->incoming_compression_algorithm;
- gpr_mu_unlock(&call->mu);
return algorithm;
}
@@ -729,9 +723,7 @@ static grpc_compression_algorithm compression_algorithm_for_level_locked(
uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) {
uint32_t flags;
- gpr_mu_lock(&call->mu);
flags = call->test_only_last_message_flags;
- gpr_mu_unlock(&call->mu);
return flags;
}
@@ -785,9 +777,7 @@ static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx,
uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) {
uint32_t encodings_accepted_by_peer;
- gpr_mu_lock(&call->mu);
encodings_accepted_by_peer = call->encodings_accepted_by_peer;
- gpr_mu_unlock(&call->mu);
return encodings_accepted_by_peer;
}
@@ -906,18 +896,19 @@ static void recv_common_filter(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_error *error =
status_code == GRPC_STATUS_OK
? GRPC_ERROR_NONE
- : grpc_error_set_int(GRPC_ERROR_CREATE("Error received from peer"),
+ : grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Error received from peer"),
GRPC_ERROR_INT_GRPC_STATUS,
(intptr_t)status_code);
if (b->idx.named.grpc_message != NULL) {
- char *msg =
- grpc_slice_to_c_string(GRPC_MDVALUE(b->idx.named.grpc_message->md));
- error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, msg);
- gpr_free(msg);
+ error = grpc_error_set_str(
+ error, GRPC_ERROR_STR_GRPC_MESSAGE,
+ grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md)));
grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_message);
} else if (error != GRPC_ERROR_NONE) {
- error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, "");
+ error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
+ grpc_empty_slice());
}
set_status_from_error(exec_ctx, call, STATUS_FROM_WIRE, error);
@@ -1056,7 +1047,7 @@ static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data,
}
static grpc_error *consolidate_batch_errors(batch_control *bctl) {
- size_t n = (size_t)gpr_atm_no_barrier_load(&bctl->num_errors);
+ size_t n = (size_t)gpr_atm_acq_load(&bctl->num_errors);
if (n == 0) {
return GRPC_ERROR_NONE;
} else if (n == 1) {
@@ -1066,8 +1057,8 @@ static grpc_error *consolidate_batch_errors(batch_control *bctl) {
bctl->errors[0] = NULL;
return e;
} else {
- grpc_error *error =
- GRPC_ERROR_CREATE_REFERENCING("Call batch failed", bctl->errors, n);
+ grpc_error *error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Call batch failed", bctl->errors, n);
for (size_t i = 0; i < n; i++) {
GRPC_ERROR_UNREF(bctl->errors[i]);
bctl->errors[i] = NULL;
@@ -1083,8 +1074,6 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
grpc_call *call = bctl->call;
grpc_error *error = consolidate_batch_errors(bctl);
- gpr_mu_lock(&call->mu);
-
if (bctl->send_initial_metadata) {
grpc_metadata_batch_destroy(
exec_ctx,
@@ -1103,20 +1092,23 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
recv_trailing_filter(exec_ctx, call, md);
- call->received_final_op = true;
/* propagate cancellation to any interested children */
+ gpr_atm_rel_store(&call->received_final_op_atm, 1);
+ gpr_mu_lock(&call->child_list_mu);
child_call = call->first_child;
if (child_call != NULL) {
do {
next_child_call = child_call->sibling_next;
if (child_call->cancellation_is_inherited) {
GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel");
- grpc_call_cancel(child_call, NULL);
+ cancel_with_error(exec_ctx, child_call, STATUS_FROM_API_OVERRIDE,
+ GRPC_ERROR_CANCELLED);
GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel");
}
child_call = next_child_call;
} while (child_call != call->first_child);
}
+ gpr_mu_unlock(&call->child_list_mu);
if (call->is_client) {
get_final_status(call, set_status_value_directly,
@@ -1130,7 +1122,6 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE;
}
- gpr_mu_unlock(&call->mu);
if (bctl->is_notify_tag_closure) {
/* unrefs bctl->error */
@@ -1221,7 +1212,6 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
- gpr_mu_lock(&bctl->call->mu);
if (error != GRPC_ERROR_NONE) {
if (call->receiving_stream != NULL) {
grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
@@ -1233,11 +1223,9 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
}
if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE ||
call->receiving_stream == NULL) {
- gpr_mu_unlock(&bctl->call->mu);
process_data_after_md(exec_ctx, bctlp);
} else {
call->saved_receiving_stream_ready_bctlp = bctlp;
- gpr_mu_unlock(&bctl->call->mu);
}
}
@@ -1296,7 +1284,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
grpc_error *error, bool has_cancelled) {
if (error == GRPC_ERROR_NONE) return;
- int idx = (int)gpr_atm_no_barrier_fetch_add(&bctl->num_errors, 1);
+ int idx = (int)gpr_atm_full_fetch_add(&bctl->num_errors, 1);
if (idx == 0 && !has_cancelled) {
cancel_with_error(exec_ctx, bctl->call, STATUS_FROM_CORE,
GRPC_ERROR_REF(error));
@@ -1309,8 +1297,6 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
- gpr_mu_lock(&call->mu);
-
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch *md =
@@ -1336,11 +1322,9 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
receiving_stream_ready, call->saved_receiving_stream_ready_bctlp,
grpc_schedule_on_exec_ctx);
call->saved_receiving_stream_ready_bctlp = NULL;
- grpc_closure_sched(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error));
+ grpc_closure_run(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error));
}
- gpr_mu_unlock(&call->mu);
-
finish_batch_step(exec_ctx, bctl);
}
@@ -1393,7 +1377,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
bctl->notify_tag = notify_tag;
bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0);
- gpr_mu_lock(&call->mu);
grpc_transport_stream_op *stream_op = &bctl->op;
memset(stream_op, 0, sizeof(*stream_op));
stream_op->covered_by_poller = true;
@@ -1539,7 +1522,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
{
grpc_error *override_error = GRPC_ERROR_NONE;
if (op->data.send_status_from_server.status != GRPC_STATUS_OK) {
- override_error = GRPC_ERROR_CREATE("Error from server send status");
+ override_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Error from server send status");
}
if (op->data.send_status_from_server.status_details != NULL) {
call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
@@ -1549,8 +1533,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->send_extra_metadata_count++;
char *msg = grpc_slice_to_c_string(
GRPC_MDVALUE(call->send_extra_metadata[1].md));
- override_error = grpc_error_set_str(
- override_error, GRPC_ERROR_STR_GRPC_MESSAGE, msg);
+ override_error =
+ grpc_error_set_str(override_error, GRPC_ERROR_STR_GRPC_MESSAGE,
+ grpc_slice_from_copied_string(msg));
gpr_free(msg);
}
set_status_from_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE,
@@ -1679,8 +1664,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_closure_init(&bctl->finish_batch, finish_batch, bctl,
grpc_schedule_on_exec_ctx);
stream_op->on_complete = &bctl->finish_batch;
- call->sent_any_op = true;
- gpr_mu_unlock(&call->mu);
+ gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
execute_op(exec_ctx, call, stream_op);
@@ -1711,7 +1695,6 @@ done_with_error:
if (bctl->recv_final_op) {
call->requested_final_op = 0;
}
- gpr_mu_unlock(&call->mu);
goto done;
}
@@ -1760,10 +1743,8 @@ uint8_t grpc_call_is_client(grpc_call *call) { return call->is_client; }
grpc_compression_algorithm grpc_call_compression_for_level(
grpc_call *call, grpc_compression_level level) {
- gpr_mu_lock(&call->mu);
grpc_compression_algorithm algo =
compression_algorithm_for_level_locked(call, level);
- gpr_mu_unlock(&call->mu);
return algo;
}