aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/call.c148
-rw-r--r--src/core/lib/surface/init.c2
-rw-r--r--src/core/lib/surface/lame_client.cc19
-rw-r--r--src/core/lib/surface/server.c2
4 files changed, 54 insertions, 117 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 03eaaf99ac..e68c201134 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -122,7 +122,6 @@ typedef struct batch_control {
bool is_closure;
} notify_tag;
} completion_data;
- grpc_closure start_batch;
grpc_closure finish_batch;
gpr_refcount steps_to_complete;
@@ -152,7 +151,6 @@ typedef struct {
struct grpc_call {
gpr_refcount ext_ref;
gpr_arena *arena;
- grpc_call_combiner call_combiner;
grpc_completion_queue *cq;
grpc_polling_entity pollent;
grpc_channel *channel;
@@ -186,11 +184,6 @@ struct grpc_call {
Element 0 is initial metadata, element 1 is trailing metadata. */
grpc_metadata_array *buffered_metadata[2];
- grpc_metadata compression_md;
-
- // A char* indicating the peer name.
- gpr_atm peer_string;
-
/* Packed received call statuses from various sources */
gpr_atm status[STATUS_SOURCE_COUNT];
@@ -269,9 +262,8 @@ grpc_tracer_flag grpc_compression_trace =
#define CALL_FROM_TOP_ELEM(top_elem) \
CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
-static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call,
- grpc_transport_stream_op_batch *op,
- grpc_closure *start_batch_closure);
+static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ grpc_transport_stream_op_batch *op);
static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_status_code status,
const char *description);
@@ -336,7 +328,6 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
sizeof(grpc_call) + channel_stack->call_stack_size);
gpr_ref_init(&call->ext_ref, 1);
call->arena = arena;
- grpc_call_combiner_init(&call->call_combiner);
*out_call = call;
call->channel = args->channel;
call->cq = args->cq;
@@ -445,8 +436,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
.path = path,
.start_time = call->start_time,
.deadline = send_deadline,
- .arena = call->arena,
- .call_combiner = &call->call_combiner};
+ .arena = call->arena};
add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1,
destroy_call, call, &call_args));
if (error != GRPC_ERROR_NONE) {
@@ -513,8 +503,6 @@ static void release_call(grpc_exec_ctx *exec_ctx, void *call,
grpc_error *error) {
grpc_call *c = call;
grpc_channel *channel = c->channel;
- grpc_call_combiner_destroy(&c->call_combiner);
- gpr_free((char *)c->peer_string);
grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena));
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
}
@@ -614,37 +602,30 @@ grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
return GRPC_CALL_OK;
}
-// This is called via the call combiner to start sending a batch down
-// the filter stack.
-static void execute_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *ignored) {
- grpc_transport_stream_op_batch *batch = arg;
- grpc_call *call = batch->handler_private.extra_arg;
- GPR_TIMER_BEGIN("execute_batch", 0);
- grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
- GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
- elem->filter->start_transport_stream_op_batch(exec_ctx, elem, batch);
- GPR_TIMER_END("execute_batch", 0);
-}
-
-// start_batch_closure points to a caller-allocated closure to be used
-// for entering the call combiner.
-static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call,
- grpc_transport_stream_op_batch *batch,
- grpc_closure *start_batch_closure) {
- batch->handler_private.extra_arg = call;
- GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
- grpc_schedule_on_exec_ctx);
- GRPC_CALL_COMBINER_START(exec_ctx, &call->call_combiner, start_batch_closure,
- GRPC_ERROR_NONE, "executing batch");
+static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ grpc_transport_stream_op_batch *op) {
+ grpc_call_element *elem;
+
+ GPR_TIMER_BEGIN("execute_op", 0);
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op);
+ GPR_TIMER_END("execute_op", 0);
}
char *grpc_call_get_peer(grpc_call *call) {
- char *peer_string = (char *)gpr_atm_acq_load(&call->peer_string);
- if (peer_string != NULL) return gpr_strdup(peer_string);
- peer_string = grpc_channel_get_target(call->channel);
- if (peer_string != NULL) return peer_string;
- return gpr_strdup("unknown");
+ grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ char *result;
+ GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call));
+ result = elem->filter->get_peer(&exec_ctx, elem);
+ if (result == NULL) {
+ result = grpc_channel_get_target(call->channel);
+ }
+ if (result == NULL) {
+ result = gpr_strdup("unknown");
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+ return result;
}
grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
@@ -671,41 +652,20 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
return GRPC_CALL_OK;
}
-typedef struct {
- grpc_call *call;
- grpc_closure start_batch;
- grpc_closure finish_batch;
-} cancel_state;
-
-// The on_complete callback used when sending a cancel_stream batch down
-// the filter stack. Yields the call combiner when the batch is done.
-static void done_termination(grpc_exec_ctx *exec_ctx, void *arg,
+static void done_termination(grpc_exec_ctx *exec_ctx, void *call,
grpc_error *error) {
- cancel_state *state = (cancel_state *)arg;
- GRPC_CALL_COMBINER_STOP(exec_ctx, &state->call->call_combiner,
- "on_complete for cancel_stream op");
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, state->call, "termination");
- gpr_free(state);
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "termination");
}
static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_error *error) {
GRPC_CALL_INTERNAL_REF(c, "termination");
- // Inform the call combiner of the cancellation, so that it can cancel
- // any in-flight asynchronous actions that may be holding the call
- // combiner. This ensures that the cancel_stream batch can be sent
- // down the filter stack in a timely manner.
- grpc_call_combiner_cancel(exec_ctx, &c->call_combiner, GRPC_ERROR_REF(error));
set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error));
- cancel_state *state = (cancel_state *)gpr_malloc(sizeof(*state));
- state->call = c;
- GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
- grpc_schedule_on_exec_ctx);
- grpc_transport_stream_op_batch *op =
- grpc_make_transport_stream_op(&state->finish_batch);
+ grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op(
+ GRPC_CLOSURE_CREATE(done_termination, c, grpc_schedule_on_exec_ctx));
op->cancel_stream = true;
op->payload->cancel_stream.cancel_error = error;
- execute_batch(exec_ctx, c, op, &state->start_batch);
+ execute_op(exec_ctx, c, op);
}
static grpc_error *error_from_status(grpc_status_code status,
@@ -1471,18 +1431,6 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
}
}
-// The recv_message_ready callback used when sending a batch containing
-// a recv_message op down the filter stack. Yields the call combiner
-// before processing the received message.
-static void receiving_stream_ready_in_call_combiner(grpc_exec_ctx *exec_ctx,
- void *bctlp,
- grpc_error *error) {
- batch_control *bctl = bctlp;
- grpc_call *call = bctl->call;
- GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "recv_message_ready");
- receiving_stream_ready(exec_ctx, bctlp, error);
-}
-
static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
grpc_call *call = bctl->call;
@@ -1589,9 +1537,6 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
- GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner,
- "recv_initial_metadata_ready");
-
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch *md =
@@ -1645,8 +1590,7 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) {
batch_control *bctl = bctlp;
- grpc_call *call = bctl->call;
- GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "on_complete");
+
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
finish_batch_step(exec_ctx, bctl);
}
@@ -1666,6 +1610,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
int num_completion_callbacks_needed = 1;
grpc_call_error error = GRPC_CALL_OK;
+ // sent_initial_metadata guards against variable reuse.
+ grpc_metadata compression_md;
+
GPR_TIMER_BEGIN("grpc_call_start_batch", 0);
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
@@ -1713,7 +1660,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
goto done_with_error;
}
/* process compression level */
- memset(&call->compression_md, 0, sizeof(call->compression_md));
+ memset(&compression_md, 0, sizeof(compression_md));
size_t additional_metadata_count = 0;
grpc_compression_level effective_compression_level =
GRPC_COMPRESS_LEVEL_NONE;
@@ -1751,9 +1698,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
const grpc_stream_compression_algorithm calgo =
stream_compression_algorithm_for_level_locked(
call, effective_stream_compression_level);
- call->compression_md.key =
+ compression_md.key =
GRPC_MDSTR_GRPC_INTERNAL_STREAM_ENCODING_REQUEST;
- call->compression_md.value =
+ compression_md.value =
grpc_stream_compression_algorithm_slice(calgo);
} else {
const grpc_compression_algorithm calgo =
@@ -1761,10 +1708,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call, effective_compression_level);
/* the following will be picked up by the compress filter and used
* as the call's compression algorithm. */
- call->compression_md.key =
- GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
- call->compression_md.value =
- grpc_compression_algorithm_slice(calgo);
+ compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
+ compression_md.value = grpc_compression_algorithm_slice(calgo);
additional_metadata_count++;
}
}
@@ -1779,7 +1724,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
if (!prepare_application_metadata(
exec_ctx, call, (int)op->data.send_initial_metadata.count,
op->data.send_initial_metadata.metadata, 0, call->is_client,
- &call->compression_md, (int)additional_metadata_count)) {
+ &compression_md, (int)additional_metadata_count)) {
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
@@ -1791,10 +1736,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
op->flags;
- if (call->is_client) {
- stream_op_payload->send_initial_metadata.peer_string =
- &call->peer_string;
- }
break;
case GRPC_OP_SEND_MESSAGE:
if (!are_write_flags_valid(op->flags)) {
@@ -1927,10 +1868,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
&call->receiving_initial_metadata_ready;
- if (!call->is_client) {
- stream_op_payload->recv_initial_metadata.peer_string =
- &call->peer_string;
- }
num_completion_callbacks_needed++;
break;
case GRPC_OP_RECV_MESSAGE:
@@ -1947,9 +1884,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
stream_op->recv_message = true;
call->receiving_buffer = op->data.recv_message.recv_message;
stream_op_payload->recv_message.recv_message = &call->receiving_stream;
- GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
- receiving_stream_ready_in_call_combiner, bctl,
- grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&call->receiving_stream_ready, receiving_stream_ready,
+ bctl, grpc_schedule_on_exec_ctx);
stream_op_payload->recv_message.recv_message_ready =
&call->receiving_stream_ready;
num_completion_callbacks_needed++;
@@ -2019,7 +1955,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
stream_op->on_complete = &bctl->finish_batch;
gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
- execute_batch(exec_ctx, call, stream_op, &bctl->start_batch);
+ execute_op(exec_ctx, call, stream_op);
done:
GPR_TIMER_END("grpc_call_start_batch", 0);
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 280315036f..898476daee 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -31,7 +31,6 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/http/parser.h"
-#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
@@ -130,7 +129,6 @@ void grpc_init(void) {
grpc_register_tracer(&grpc_trace_channel_stack_builder);
grpc_register_tracer(&grpc_http1_trace);
grpc_register_tracer(&grpc_cq_pluck_trace); // default on
- grpc_register_tracer(&grpc_call_combiner_trace);
grpc_register_tracer(&grpc_combiner_trace);
grpc_register_tracer(&grpc_server_channel_trace);
grpc_register_tracer(&grpc_bdp_estimator_trace);
diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc
index 6286f9159d..a0791080a9 100644
--- a/src/core/lib/surface/lame_client.cc
+++ b/src/core/lib/surface/lame_client.cc
@@ -40,7 +40,6 @@ namespace grpc_core {
namespace {
struct CallData {
- grpc_call_combiner *call_combiner;
grpc_linked_mdelem status;
grpc_linked_mdelem details;
grpc_core::atomic<bool> filled_metadata;
@@ -53,14 +52,14 @@ struct ChannelData {
static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *mdb) {
- CallData *calld = reinterpret_cast<CallData *>(elem->call_data);
+ CallData *calld = static_cast<CallData *>(elem->call_data);
bool expected = false;
if (!calld->filled_metadata.compare_exchange_strong(
expected, true, grpc_core::memory_order_relaxed,
grpc_core::memory_order_relaxed)) {
return;
}
- ChannelData *chand = reinterpret_cast<ChannelData *>(elem->channel_data);
+ ChannelData *chand = static_cast<ChannelData *>(elem->channel_data);
char tmp[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(chand->error_code, tmp);
calld->status.md = grpc_mdelem_from_slices(
@@ -80,7 +79,6 @@ static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
static void lame_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
- CallData *calld = reinterpret_cast<CallData *>(elem->call_data);
if (op->recv_initial_metadata) {
fill_metadata(exec_ctx, elem,
op->payload->recv_initial_metadata.recv_initial_metadata);
@@ -89,8 +87,12 @@ static void lame_start_transport_stream_op_batch(
op->payload->recv_trailing_metadata.recv_trailing_metadata);
}
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op, GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"),
- calld->call_combiner);
+ exec_ctx, op,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"));
+}
+
+static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
+ return NULL;
}
static void lame_get_channel_info(grpc_exec_ctx *exec_ctx,
@@ -120,8 +122,6 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
- CallData *calld = reinterpret_cast<CallData *>(elem->call_data);
- calld->call_combiner = args->call_combiner;
return GRPC_ERROR_NONE;
}
@@ -156,6 +156,7 @@ extern "C" const grpc_channel_filter grpc_lame_filter = {
sizeof(grpc_core::ChannelData),
grpc_core::init_channel_elem,
grpc_core::destroy_channel_elem,
+ grpc_core::lame_get_peer,
grpc_core::lame_get_channel_info,
"lame-client",
};
@@ -175,7 +176,7 @@ grpc_channel *grpc_lame_client_channel_create(const char *target,
"error_message=%s)",
3, (target, (int)error_code, error_message));
GPR_ASSERT(elem->filter == &grpc_lame_filter);
- auto chand = reinterpret_cast<grpc_core::ChannelData *>(elem->channel_data);
+ auto chand = static_cast<grpc_core::ChannelData *>(elem->channel_data);
chand->error_code = error_code;
chand->error_message = error_message;
grpc_exec_ctx_finish(&exec_ctx);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 8582d826ca..66dcc299aa 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -789,6 +789,7 @@ static void server_mutate_op(grpc_call_element *elem,
static void server_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
server_mutate_op(elem, op);
grpc_call_next_op(exec_ctx, elem, op);
}
@@ -961,6 +962,7 @@ const grpc_channel_filter grpc_server_top_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
"server",
};