aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/call.cc17
-rw-r--r--src/core/lib/surface/completion_queue.cc38
-rw-r--r--src/core/lib/surface/completion_queue.h19
-rw-r--r--src/core/lib/surface/completion_queue_factory.cc6
-rw-r--r--src/core/lib/surface/server.cc42
5 files changed, 70 insertions, 52 deletions
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index 11b438f5dc..a9349afa68 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -207,7 +207,7 @@ struct grpc_call {
grpc_server* server;
} server;
} final_op;
- grpc_error* status_error;
+ gpr_atm status_error;
/* recv_state can contain one of the following values:
RECV_NONE : : no initial metadata and messages received
@@ -519,10 +519,12 @@ static void destroy_call(void* call, grpc_error* error) {
GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
}
- grpc_error_get_status(c->status_error, c->send_deadline,
+ grpc_error* status_error =
+ reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&c->status_error));
+ grpc_error_get_status(status_error, c->send_deadline,
&c->final_info.final_status, nullptr, nullptr,
&(c->final_info.error_string));
- GRPC_ERROR_UNREF(c->status_error);
+ GRPC_ERROR_UNREF(status_error);
c->final_info.stats.latency =
gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
@@ -705,7 +707,7 @@ static void set_final_status(grpc_call* call, grpc_error* error) {
call->final_op.client.error_string);
// explicitly take a ref
grpc_slice_ref_internal(*call->final_op.client.status_details);
- call->status_error = error;
+ gpr_atm_rel_store(&call->status_error, reinterpret_cast<gpr_atm>(error));
grpc_core::channelz::ChannelNode* channelz_channel =
grpc_channel_get_channelz_node(call->channel);
if (channelz_channel != nullptr) {
@@ -717,7 +719,9 @@ static void set_final_status(grpc_call* call, grpc_error* error) {
}
} else {
*call->final_op.server.cancelled =
- error != GRPC_ERROR_NONE || call->status_error != GRPC_ERROR_NONE;
+ error != GRPC_ERROR_NONE ||
+ reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&call->status_error)) !=
+ GRPC_ERROR_NONE;
grpc_core::channelz::ServerNode* channelz_server =
grpc_server_get_channelz_node(call->final_op.server.server);
if (channelz_server != nullptr) {
@@ -1686,7 +1690,8 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
}
}
- call->status_error = status_error;
+ gpr_atm_rel_store(&call->status_error,
+ reinterpret_cast<gpr_atm>(status_error));
if (!prepare_application_metadata(
call,
static_cast<int>(
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc
index 0769d9e4f6..5dc9991f70 100644
--- a/src/core/lib/surface/completion_queue.cc
+++ b/src/core/lib/surface/completion_queue.cc
@@ -184,7 +184,8 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
typedef struct cq_vtable {
grpc_cq_completion_type cq_completion_type;
size_t data_size;
- void (*init)(void* data, grpc_core::CQCallbackInterface* shutdown_callback);
+ void (*init)(void* data,
+ grpc_experimental_completion_queue_functor* shutdown_callback);
void (*shutdown)(grpc_completion_queue* cq);
void (*destroy)(void* data);
bool (*begin_op)(grpc_completion_queue* cq, void* tag);
@@ -267,7 +268,7 @@ typedef struct cq_callback_data {
bool shutdown_called;
/** A callback that gets invoked when the CQ completes shutdown */
- grpc_core::CQCallbackInterface* shutdown_callback;
+ grpc_experimental_completion_queue_functor* shutdown_callback;
} cq_callback_data;
/* Completion queue structure */
@@ -333,12 +334,12 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline, void* reserved);
// Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
-static void cq_init_next(void* data,
- grpc_core::CQCallbackInterface* shutdown_callback);
-static void cq_init_pluck(void* data,
- grpc_core::CQCallbackInterface* shutdown_callback);
-static void cq_init_callback(void* data,
- grpc_core::CQCallbackInterface* shutdown_callback);
+static void cq_init_next(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
+static void cq_init_pluck(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
+static void cq_init_callback(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
static void cq_destroy_next(void* data);
static void cq_destroy_pluck(void* data);
static void cq_destroy_callback(void* data);
@@ -462,7 +463,7 @@ static long cq_event_queue_num_items(grpc_cq_event_queue* q) {
grpc_completion_queue* grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
- grpc_core::CQCallbackInterface* shutdown_callback) {
+ grpc_experimental_completion_queue_functor* shutdown_callback) {
GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0);
grpc_completion_queue* cq;
@@ -497,8 +498,8 @@ grpc_completion_queue* grpc_completion_queue_create_internal(
return cq;
}
-static void cq_init_next(void* data,
- grpc_core::CQCallbackInterface* shutdown_callback) {
+static void cq_init_next(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
cq_next_data* cqd = static_cast<cq_next_data*>(data);
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
@@ -513,8 +514,8 @@ static void cq_destroy_next(void* data) {
cq_event_queue_destroy(&cqd->queue);
}
-static void cq_init_pluck(void* data,
- grpc_core::CQCallbackInterface* shutdown_callback) {
+static void cq_init_pluck(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
@@ -532,7 +533,7 @@ static void cq_destroy_pluck(void* data) {
}
static void cq_init_callback(
- void* data, grpc_core::CQCallbackInterface* shutdown_callback) {
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
cq_callback_data* cqd = static_cast<cq_callback_data*>(data);
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
@@ -859,7 +860,8 @@ static void cq_end_op_for_callback(
GRPC_ERROR_UNREF(error);
- (static_cast<grpc_core::CQCallbackInterface*>(tag))->Run(is_success);
+ auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
+ (*functor->functor_run)(functor, is_success);
}
void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
@@ -1343,7 +1345,7 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
GPR_ASSERT(cqd->shutdown_called);
cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
- callback->Run(true);
+ (*callback->functor_run)(callback, true);
}
static void cq_shutdown_callback(grpc_completion_queue* cq) {
@@ -1364,9 +1366,11 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) {
}
cqd->shutdown_called = true;
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ gpr_mu_unlock(cq->mu);
cq_finish_shutdown_callback(cq);
+ } else {
+ gpr_mu_unlock(cq->mu);
}
- gpr_mu_unlock(cq->mu);
GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
}
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index a7c524d8e8..d60fe6d6ef 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -48,23 +48,6 @@ typedef struct grpc_cq_completion {
uintptr_t next;
} grpc_cq_completion;
-/// For callback CQs, the tag that is passed in for an operation must
-/// actually be a pointer to an implementation of the following class.
-/// When the operation completes, the tag will be typecasted from void*
-/// to grpc_core::CQCallbackInterface* and then the Run method will be
-/// invoked on it. In practice, the language binding (e.g., C++ API
-/// implementation) is responsible for providing and using an implementation
-/// of this abstract base class.
-namespace grpc_core {
-class CQCallbackInterface {
- public:
- virtual ~CQCallbackInterface() {}
- virtual void Run(bool) GRPC_ABSTRACT;
-
- GRPC_ABSTRACT_BASE_CLASS
-};
-} // namespace grpc_core
-
#ifndef NDEBUG
void grpc_cq_internal_ref(grpc_completion_queue* cc, const char* reason,
const char* file, int line);
@@ -106,6 +89,6 @@ int grpc_get_cq_poll_num(grpc_completion_queue* cc);
grpc_completion_queue* grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
- grpc_core::CQCallbackInterface* shutdown_callback);
+ grpc_experimental_completion_queue_functor* shutdown_callback);
#endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */
diff --git a/src/core/lib/surface/completion_queue_factory.cc b/src/core/lib/surface/completion_queue_factory.cc
index ed92dd7eba..2616c156e4 100644
--- a/src/core/lib/surface/completion_queue_factory.cc
+++ b/src/core/lib/surface/completion_queue_factory.cc
@@ -31,8 +31,7 @@ static grpc_completion_queue* default_create(
const grpc_completion_queue_factory* factory,
const grpc_completion_queue_attributes* attr) {
return grpc_completion_queue_create_internal(
- attr->cq_completion_type, attr->cq_polling_type,
- static_cast<grpc_core::CQCallbackInterface*>(attr->cq_shutdown_cb));
+ attr->cq_completion_type, attr->cq_polling_type, attr->cq_shutdown_cb);
}
static grpc_completion_queue_factory_vtable default_vtable = {default_create};
@@ -73,7 +72,8 @@ grpc_completion_queue* grpc_completion_queue_create_for_pluck(void* reserved) {
}
grpc_completion_queue* grpc_completion_queue_create_for_callback(
- void* shutdown_callback, void* reserved) {
+ grpc_experimental_completion_queue_functor* shutdown_callback,
+ void* reserved) {
GPR_ASSERT(!reserved);
grpc_completion_queue_attributes attr = {
2, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback};
diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc
index 5fa58ffdec..72ddc2648d 100644
--- a/src/core/lib/surface/server.cc
+++ b/src/core/lib/surface/server.cc
@@ -150,12 +150,15 @@ struct call_data {
grpc_closure kill_zombie_closure;
grpc_closure* on_done_recv_initial_metadata;
grpc_closure recv_trailing_metadata_ready;
- grpc_error* error;
+ grpc_error* recv_initial_metadata_error;
grpc_closure* original_recv_trailing_metadata_ready;
+ grpc_error* recv_trailing_metadata_error;
+ bool seen_recv_trailing_metadata_ready;
grpc_closure publish;
call_data* pending_next;
+ grpc_call_combiner* call_combiner;
};
struct request_matcher {
@@ -727,21 +730,43 @@ static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) {
if (calld->host_set && calld->path_set) {
/* do nothing */
} else {
+ /* Pass the error reference to calld->recv_initial_metadata_error */
grpc_error* src_error = error;
error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Missing :authority or :path", &error, 1);
+ "Missing :authority or :path", &src_error, 1);
GRPC_ERROR_UNREF(src_error);
+ calld->recv_initial_metadata_error = GRPC_ERROR_REF(error);
}
-
- GRPC_CLOSURE_RUN(calld->on_done_recv_initial_metadata, error);
+ grpc_closure* closure = calld->on_done_recv_initial_metadata;
+ calld->on_done_recv_initial_metadata = nullptr;
+ if (calld->seen_recv_trailing_metadata_ready) {
+ GRPC_CALL_COMBINER_START(calld->call_combiner,
+ &calld->recv_trailing_metadata_ready,
+ calld->recv_trailing_metadata_error,
+ "continue server_recv_trailing_metadata_ready");
+ }
+ GRPC_CLOSURE_RUN(closure, error);
}
static void server_recv_trailing_metadata_ready(void* user_data,
- grpc_error* err) {
+ grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
- err = grpc_error_add_child(GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->error));
- GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err);
+ if (calld->on_done_recv_initial_metadata != nullptr) {
+ calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error);
+ calld->seen_recv_trailing_metadata_ready = true;
+ GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
+ server_recv_trailing_metadata_ready, elem,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CALL_COMBINER_STOP(calld->call_combiner,
+ "deferring server_recv_trailing_metadata_ready "
+ "until after server_on_recv_initial_metadata");
+ return;
+ }
+ error =
+ grpc_error_add_child(GRPC_ERROR_REF(error),
+ GRPC_ERROR_REF(calld->recv_initial_metadata_error));
+ GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error);
}
static void server_mutate_op(grpc_call_element* elem,
@@ -845,6 +870,7 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
memset(calld, 0, sizeof(call_data));
calld->deadline = GRPC_MILLIS_INF_FUTURE;
calld->call = grpc_call_from_top_element(elem);
+ calld->call_combiner = args->call_combiner;
GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata,
server_on_recv_initial_metadata, elem,
@@ -863,7 +889,7 @@ static void destroy_call_elem(grpc_call_element* elem,
call_data* calld = static_cast<call_data*>(elem->call_data);
GPR_ASSERT(calld->state != PENDING);
- GRPC_ERROR_UNREF(calld->error);
+ GRPC_ERROR_UNREF(calld->recv_initial_metadata_error);
if (calld->host_set) {
grpc_slice_unref_internal(calld->host);
}