diff options
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/call.cc | 55 | ||||
-rw-r--r-- | src/core/lib/surface/call.h | 5 | ||||
-rw-r--r-- | src/core/lib/surface/channel.cc | 7 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.cc | 1 | ||||
-rw-r--r-- | src/core/lib/surface/init.cc | 15 | ||||
-rw-r--r-- | src/core/lib/surface/server.cc | 16 | ||||
-rw-r--r-- | src/core/lib/surface/version.cc | 2 |
7 files changed, 66 insertions, 35 deletions
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 9a9113643d..86e0afa6ee 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -67,6 +67,9 @@ #define MAX_SEND_EXTRA_METADATA_COUNT 3 +// Used to create arena for the first call. +#define ESTIMATED_MDELEM_COUNT 16 + /* Status data for a request can come from several sources; this enumerates them all, and acts as a priority sorting for which status to return to the application - earlier entries override @@ -323,6 +326,11 @@ static parent_call* get_parent_call(grpc_call* call) { return (parent_call*)gpr_atm_acq_load(&call->parent_call_atm); } +size_t grpc_call_get_initial_size_estimate() { + return sizeof(grpc_call) + sizeof(batch_control) * MAX_CONCURRENT_BATCHES + + sizeof(grpc_linked_mdelem) * ESTIMATED_MDELEM_COUNT; +} + grpc_error* grpc_call_create(const grpc_call_create_args* args, grpc_call** out_call) { GPR_TIMER_SCOPE("grpc_call_create", 0); @@ -610,7 +618,7 @@ grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) { // This is called via the call combiner to start sending a batch down // the filter stack. static void execute_batch_in_call_combiner(void* arg, grpc_error* ignored) { - GPR_TIMER_SCOPE("execute_batch", 0); + GPR_TIMER_SCOPE("execute_batch_in_call_combiner", 0); grpc_transport_stream_op_batch* batch = static_cast<grpc_transport_stream_op_batch*>(arg); grpc_call* call = static_cast<grpc_call*>(batch->handler_private.extra_arg); @@ -747,10 +755,10 @@ static void get_final_status( status[i] = unpack_received_status(gpr_atm_acq_load(&call->status[i])); } if (grpc_call_error_trace.enabled()) { - gpr_log(GPR_DEBUG, "get_final_status %s", call->is_client ? "CLI" : "SVR"); + gpr_log(GPR_INFO, "get_final_status %s", call->is_client ? "CLI" : "SVR"); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (status[i].is_set) { - gpr_log(GPR_DEBUG, " %d: %s", i, grpc_error_string(status[i].error)); + gpr_log(GPR_INFO, " %d: %s", i, grpc_error_string(status[i].error)); } } } @@ -1124,7 +1132,7 @@ static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) { return !(flags & invalid_positions); } -static int batch_slot_for_op(grpc_op_type type) { +static size_t batch_slot_for_op(grpc_op_type type) { switch (type) { case GRPC_OP_SEND_INITIAL_METADATA: return 0; @@ -1144,20 +1152,23 @@ static int batch_slot_for_op(grpc_op_type type) { GPR_UNREACHABLE_CODE(return 123456789); } -static batch_control* allocate_batch_control(grpc_call* call, - const grpc_op* ops, - size_t num_ops) { - int slot = batch_slot_for_op(ops[0].op); - batch_control** pslot = &call->active_batches[slot]; - if (*pslot == nullptr) { - *pslot = static_cast<batch_control*>( +static batch_control* reuse_or_allocate_batch_control(grpc_call* call, + const grpc_op* ops, + size_t num_ops) { + size_t slot_idx = batch_slot_for_op(ops[0].op); + batch_control** pslot = &call->active_batches[slot_idx]; + batch_control* bctl; + if (*pslot != nullptr) { + bctl = *pslot; + if (bctl->call != nullptr) { + return nullptr; + } + memset(bctl, 0, sizeof(*bctl)); + } else { + bctl = static_cast<batch_control*>( gpr_arena_alloc(call->arena, sizeof(batch_control))); + *pslot = bctl; } - batch_control* bctl = *pslot; - if (bctl->call != nullptr) { - return nullptr; - } - memset(bctl, 0, sizeof(*bctl)); bctl->call = call; bctl->op.payload = &call->stream_op_payload; return bctl; @@ -1259,8 +1270,12 @@ static void post_batch_completion(batch_control* bctl) { if (bctl->completion_data.notify_tag.is_closure) { /* unrefs bctl->error */ bctl->call = nullptr; - GRPC_CLOSURE_RUN((grpc_closure*)bctl->completion_data.notify_tag.tag, - error); + /* This closure may be meant to be run within some combiner. Since we aren't + * running in any combiner here, we need to use GRPC_CLOSURE_SCHED instead + * of GRPC_CLOSURE_RUN. + */ + GRPC_CLOSURE_SCHED((grpc_closure*)bctl->completion_data.notify_tag.tag, + error); GRPC_CALL_INTERNAL_UNREF(call, "completion"); } else { /* unrefs bctl->error */ @@ -1539,7 +1554,7 @@ static void free_no_op_completion(void* p, grpc_cq_completion* completion) { static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, size_t nops, void* notify_tag, int is_notify_tag_closure) { - GPR_TIMER_SCOPE("grpc_call_start_batch", 0); + GPR_TIMER_SCOPE("call_start_batch", 0); size_t i; const grpc_op* op; @@ -1565,7 +1580,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, goto done; } - bctl = allocate_batch_control(call, ops, nops); + bctl = reuse_or_allocate_batch_control(call, ops, nops); if (bctl == nullptr) { return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; } diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index 793cce4efa..e000f13e7d 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -98,6 +98,11 @@ void* grpc_call_context_get(grpc_call* call, grpc_context_index elem); uint8_t grpc_call_is_client(grpc_call* call); +/* Get the estimated memory size for a call BESIDES the call stack. Combined + * with the size of the call stack, it helps estimate the arena size for the + * initial call. */ +size_t grpc_call_get_initial_size_estimate(); + /* Return an appropriate compression algorithm for the requested compression \a * level in the context of \a call. */ grpc_compression_algorithm grpc_call_compression_for_level( diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc index d740ebd411..a466b325be 100644 --- a/src/core/lib/surface/channel.cc +++ b/src/core/lib/surface/channel.cc @@ -72,10 +72,6 @@ struct grpc_channel { }; #define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack*)((c) + 1)) -#define CHANNEL_FROM_CHANNEL_STACK(channel_stack) \ - (((grpc_channel*)(channel_stack)) - 1) -#define CHANNEL_FROM_TOP_ELEM(top_elem) \ - CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem)) static void destroy_channel(void* arg, grpc_error* error); @@ -112,7 +108,8 @@ grpc_channel* grpc_channel_create_with_builder( gpr_atm_no_barrier_store( &channel->call_size_estimate, - (gpr_atm)CHANNEL_STACK_FROM_CHANNEL(channel)->call_stack_size); + (gpr_atm)CHANNEL_STACK_FROM_CHANNEL(channel)->call_stack_size + + grpc_call_get_initial_size_estimate()); grpc_compression_options_init(&channel->compression_options); for (size_t i = 0; i < args->num_args; i++) { diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index d0363917b0..f751741712 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -390,7 +390,6 @@ static bool cq_event_queue_push(grpc_cq_event_queue* q, grpc_cq_completion* c) { static grpc_cq_completion* cq_event_queue_pop(grpc_cq_event_queue* q) { grpc_cq_completion* c = nullptr; - grpc_core::ExecCtx exec_ctx; if (gpr_spinlock_trylock(&q->queue_lock)) { GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(); diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index bd436d6857..16be81e9c2 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -27,13 +27,12 @@ #include <grpc/support/log.h> #include <grpc/support/time.h> #include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/channel/channel_trace_registry.h" +#include "src/core/lib/channel/channelz_registry.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/channel/handshaker_registry.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/trace.h" -#include "src/core/lib/gpr/fork.h" -#include "src/core/lib/gprpp/thd.h" +#include "src/core/lib/gprpp/fork.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/iomgr/call_combiner.h" #include "src/core/lib/iomgr/combiner.h" @@ -65,12 +64,10 @@ static int g_initializations; static void do_basic_init(void) { gpr_log_verbosity_init(); - grpc_fork_support_init(); gpr_mu_init(&g_init_mu); grpc_register_built_in_plugins(); grpc_cq_global_init(); g_initializations = 0; - grpc_fork_handlers_auto_register(); } static bool append_filter(grpc_channel_stack_builder* builder, void* arg) { @@ -123,13 +120,14 @@ void grpc_init(void) { gpr_mu_lock(&g_init_mu); if (++g_initializations == 1) { + grpc_core::Fork::GlobalInit(); + grpc_fork_handlers_auto_register(); gpr_time_init(); - grpc_core::Thread::Init(); grpc_stats_init(); grpc_slice_intern_init(); grpc_mdctx_global_init(); grpc_channel_init_init(); - grpc_channel_trace_registry_init(); + grpc_core::ChannelzRegistry::Init(); grpc_security_pre_init(); grpc_core::ExecCtx::GlobalInit(); grpc_iomgr_init(); @@ -178,8 +176,9 @@ void grpc_shutdown(void) { grpc_mdctx_global_shutdown(); grpc_handshaker_factory_registry_shutdown(); grpc_slice_intern_shutdown(); - grpc_channel_trace_registry_shutdown(); + grpc_core::ChannelzRegistry::Shutdown(); grpc_stats_shutdown(); + grpc_core::Fork::GlobalShutdown(); } grpc_core::ExecCtx::GlobalShutdown(); } diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index f7505c888e..cb34def740 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -1161,6 +1161,22 @@ static void listener_destroy_done(void* s, grpc_error* error) { gpr_mu_unlock(&server->mu_global); } +/* + - Kills all pending requests-for-incoming-RPC-calls (i.e the requests made via + grpc_server_request_call and grpc_server_request_registered call will now be + cancelled). See 'kill_pending_work_locked()' + + - Shuts down the listeners (i.e the server will no longer listen on the port + for new incoming channels). + + - Iterates through all channels on the server and sends shutdown msg (see + 'channel_broadcaster_shutdown()' for details) to the clients via the + transport layer. The transport layer then guarantees the following: + -- Sends shutdown to the client (for eg: HTTP2 transport sends GOAWAY) + -- If the server has outstanding calls that are in the process, the + connection is NOT closed until the server is done with all those calls + -- Once, there are no more calls in progress, the channel is closed + */ void grpc_server_shutdown_and_notify(grpc_server* server, grpc_completion_queue* cq, void* tag) { listener* l; diff --git a/src/core/lib/surface/version.cc b/src/core/lib/surface/version.cc index a712e10037..306b7c395e 100644 --- a/src/core/lib/surface/version.cc +++ b/src/core/lib/surface/version.cc @@ -25,4 +25,4 @@ const char* grpc_version_string(void) { return "6.0.0-dev"; } -const char* grpc_g_stands_for(void) { return "glorious"; } +const char* grpc_g_stands_for(void) { return "gloriosa"; } |