diff options
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/call.cc | 17 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.cc | 38 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.h | 19 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue_factory.cc | 6 | ||||
-rw-r--r-- | src/core/lib/surface/server.cc | 42 |
5 files changed, 70 insertions, 52 deletions
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 11b438f5dc..a9349afa68 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -207,7 +207,7 @@ struct grpc_call { grpc_server* server; } server; } final_op; - grpc_error* status_error; + gpr_atm status_error; /* recv_state can contain one of the following values: RECV_NONE : : no initial metadata and messages received @@ -519,10 +519,12 @@ static void destroy_call(void* call, grpc_error* error) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } - grpc_error_get_status(c->status_error, c->send_deadline, + grpc_error* status_error = + reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&c->status_error)); + grpc_error_get_status(status_error, c->send_deadline, &c->final_info.final_status, nullptr, nullptr, &(c->final_info.error_string)); - GRPC_ERROR_UNREF(c->status_error); + GRPC_ERROR_UNREF(status_error); c->final_info.stats.latency = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); @@ -705,7 +707,7 @@ static void set_final_status(grpc_call* call, grpc_error* error) { call->final_op.client.error_string); // explicitly take a ref grpc_slice_ref_internal(*call->final_op.client.status_details); - call->status_error = error; + gpr_atm_rel_store(&call->status_error, reinterpret_cast<gpr_atm>(error)); grpc_core::channelz::ChannelNode* channelz_channel = grpc_channel_get_channelz_node(call->channel); if (channelz_channel != nullptr) { @@ -717,7 +719,9 @@ static void set_final_status(grpc_call* call, grpc_error* error) { } } else { *call->final_op.server.cancelled = - error != GRPC_ERROR_NONE || call->status_error != GRPC_ERROR_NONE; + error != GRPC_ERROR_NONE || + reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&call->status_error)) != + GRPC_ERROR_NONE; grpc_core::channelz::ServerNode* channelz_server = grpc_server_get_channelz_node(call->final_op.server.server); if (channelz_server != nullptr) { @@ -1686,7 +1690,8 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, } } - call->status_error = status_error; + gpr_atm_rel_store(&call->status_error, + reinterpret_cast<gpr_atm>(status_error)); if (!prepare_application_metadata( call, static_cast<int>( diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 0769d9e4f6..5dc9991f70 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -184,7 +184,8 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { typedef struct cq_vtable { grpc_cq_completion_type cq_completion_type; size_t data_size; - void (*init)(void* data, grpc_core::CQCallbackInterface* shutdown_callback); + void (*init)(void* data, + grpc_experimental_completion_queue_functor* shutdown_callback); void (*shutdown)(grpc_completion_queue* cq); void (*destroy)(void* data); bool (*begin_op)(grpc_completion_queue* cq, void* tag); @@ -267,7 +268,7 @@ typedef struct cq_callback_data { bool shutdown_called; /** A callback that gets invoked when the CQ completes shutdown */ - grpc_core::CQCallbackInterface* shutdown_callback; + grpc_experimental_completion_queue_functor* shutdown_callback; } cq_callback_data; /* Completion queue structure */ @@ -333,12 +334,12 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, gpr_timespec deadline, void* reserved); // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback -static void cq_init_next(void* data, - grpc_core::CQCallbackInterface* shutdown_callback); -static void cq_init_pluck(void* data, - grpc_core::CQCallbackInterface* shutdown_callback); -static void cq_init_callback(void* data, - grpc_core::CQCallbackInterface* shutdown_callback); +static void cq_init_next( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback); +static void cq_init_pluck( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback); +static void cq_init_callback( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback); static void cq_destroy_next(void* data); static void cq_destroy_pluck(void* data); static void cq_destroy_callback(void* data); @@ -462,7 +463,7 @@ static long cq_event_queue_num_items(grpc_cq_event_queue* q) { grpc_completion_queue* grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, - grpc_core::CQCallbackInterface* shutdown_callback) { + grpc_experimental_completion_queue_functor* shutdown_callback) { GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0); grpc_completion_queue* cq; @@ -497,8 +498,8 @@ grpc_completion_queue* grpc_completion_queue_create_internal( return cq; } -static void cq_init_next(void* data, - grpc_core::CQCallbackInterface* shutdown_callback) { +static void cq_init_next( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { cq_next_data* cqd = static_cast<cq_next_data*>(data); /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); @@ -513,8 +514,8 @@ static void cq_destroy_next(void* data) { cq_event_queue_destroy(&cqd->queue); } -static void cq_init_pluck(void* data, - grpc_core::CQCallbackInterface* shutdown_callback) { +static void cq_init_pluck( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data); /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); @@ -532,7 +533,7 @@ static void cq_destroy_pluck(void* data) { } static void cq_init_callback( - void* data, grpc_core::CQCallbackInterface* shutdown_callback) { + void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { cq_callback_data* cqd = static_cast<cq_callback_data*>(data); /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); @@ -859,7 +860,8 @@ static void cq_end_op_for_callback( GRPC_ERROR_UNREF(error); - (static_cast<grpc_core::CQCallbackInterface*>(tag))->Run(is_success); + auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag); + (*functor->functor_run)(functor, is_success); } void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, @@ -1343,7 +1345,7 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { GPR_ASSERT(cqd->shutdown_called); cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); - callback->Run(true); + (*callback->functor_run)(callback, true); } static void cq_shutdown_callback(grpc_completion_queue* cq) { @@ -1364,9 +1366,11 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) { } cqd->shutdown_called = true; if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + gpr_mu_unlock(cq->mu); cq_finish_shutdown_callback(cq); + } else { + gpr_mu_unlock(cq->mu); } - gpr_mu_unlock(cq->mu); GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)"); } diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index a7c524d8e8..d60fe6d6ef 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -48,23 +48,6 @@ typedef struct grpc_cq_completion { uintptr_t next; } grpc_cq_completion; -/// For callback CQs, the tag that is passed in for an operation must -/// actually be a pointer to an implementation of the following class. -/// When the operation completes, the tag will be typecasted from void* -/// to grpc_core::CQCallbackInterface* and then the Run method will be -/// invoked on it. In practice, the language binding (e.g., C++ API -/// implementation) is responsible for providing and using an implementation -/// of this abstract base class. -namespace grpc_core { -class CQCallbackInterface { - public: - virtual ~CQCallbackInterface() {} - virtual void Run(bool) GRPC_ABSTRACT; - - GRPC_ABSTRACT_BASE_CLASS -}; -} // namespace grpc_core - #ifndef NDEBUG void grpc_cq_internal_ref(grpc_completion_queue* cc, const char* reason, const char* file, int line); @@ -106,6 +89,6 @@ int grpc_get_cq_poll_num(grpc_completion_queue* cc); grpc_completion_queue* grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, - grpc_core::CQCallbackInterface* shutdown_callback); + grpc_experimental_completion_queue_functor* shutdown_callback); #endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */ diff --git a/src/core/lib/surface/completion_queue_factory.cc b/src/core/lib/surface/completion_queue_factory.cc index ed92dd7eba..2616c156e4 100644 --- a/src/core/lib/surface/completion_queue_factory.cc +++ b/src/core/lib/surface/completion_queue_factory.cc @@ -31,8 +31,7 @@ static grpc_completion_queue* default_create( const grpc_completion_queue_factory* factory, const grpc_completion_queue_attributes* attr) { return grpc_completion_queue_create_internal( - attr->cq_completion_type, attr->cq_polling_type, - static_cast<grpc_core::CQCallbackInterface*>(attr->cq_shutdown_cb)); + attr->cq_completion_type, attr->cq_polling_type, attr->cq_shutdown_cb); } static grpc_completion_queue_factory_vtable default_vtable = {default_create}; @@ -73,7 +72,8 @@ grpc_completion_queue* grpc_completion_queue_create_for_pluck(void* reserved) { } grpc_completion_queue* grpc_completion_queue_create_for_callback( - void* shutdown_callback, void* reserved) { + grpc_experimental_completion_queue_functor* shutdown_callback, + void* reserved) { GPR_ASSERT(!reserved); grpc_completion_queue_attributes attr = { 2, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback}; diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 5fa58ffdec..72ddc2648d 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -150,12 +150,15 @@ struct call_data { grpc_closure kill_zombie_closure; grpc_closure* on_done_recv_initial_metadata; grpc_closure recv_trailing_metadata_ready; - grpc_error* error; + grpc_error* recv_initial_metadata_error; grpc_closure* original_recv_trailing_metadata_ready; + grpc_error* recv_trailing_metadata_error; + bool seen_recv_trailing_metadata_ready; grpc_closure publish; call_data* pending_next; + grpc_call_combiner* call_combiner; }; struct request_matcher { @@ -727,21 +730,43 @@ static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) { if (calld->host_set && calld->path_set) { /* do nothing */ } else { + /* Pass the error reference to calld->recv_initial_metadata_error */ grpc_error* src_error = error; error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Missing :authority or :path", &error, 1); + "Missing :authority or :path", &src_error, 1); GRPC_ERROR_UNREF(src_error); + calld->recv_initial_metadata_error = GRPC_ERROR_REF(error); } - - GRPC_CLOSURE_RUN(calld->on_done_recv_initial_metadata, error); + grpc_closure* closure = calld->on_done_recv_initial_metadata; + calld->on_done_recv_initial_metadata = nullptr; + if (calld->seen_recv_trailing_metadata_ready) { + GRPC_CALL_COMBINER_START(calld->call_combiner, + &calld->recv_trailing_metadata_ready, + calld->recv_trailing_metadata_error, + "continue server_recv_trailing_metadata_ready"); + } + GRPC_CLOSURE_RUN(closure, error); } static void server_recv_trailing_metadata_ready(void* user_data, - grpc_error* err) { + grpc_error* error) { grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); call_data* calld = static_cast<call_data*>(elem->call_data); - err = grpc_error_add_child(GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->error)); - GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); + if (calld->on_done_recv_initial_metadata != nullptr) { + calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error); + calld->seen_recv_trailing_metadata_ready = true; + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, + server_recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); + GRPC_CALL_COMBINER_STOP(calld->call_combiner, + "deferring server_recv_trailing_metadata_ready " + "until after server_on_recv_initial_metadata"); + return; + } + error = + grpc_error_add_child(GRPC_ERROR_REF(error), + GRPC_ERROR_REF(calld->recv_initial_metadata_error)); + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error); } static void server_mutate_op(grpc_call_element* elem, @@ -845,6 +870,7 @@ static grpc_error* init_call_elem(grpc_call_element* elem, memset(calld, 0, sizeof(call_data)); calld->deadline = GRPC_MILLIS_INF_FUTURE; calld->call = grpc_call_from_top_element(elem); + calld->call_combiner = args->call_combiner; GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata, server_on_recv_initial_metadata, elem, @@ -863,7 +889,7 @@ static void destroy_call_elem(grpc_call_element* elem, call_data* calld = static_cast<call_data*>(elem->call_data); GPR_ASSERT(calld->state != PENDING); - GRPC_ERROR_UNREF(calld->error); + GRPC_ERROR_UNREF(calld->recv_initial_metadata_error); if (calld->host_set) { grpc_slice_unref_internal(calld->host); } |