diff options
author | Nicolas "Pixel" Noble <pixel@nobis-crew.org> | 2016-04-08 00:46:15 +0200 |
---|---|---|
committer | Nicolas "Pixel" Noble <pixel@nobis-crew.org> | 2016-04-08 00:46:15 +0200 |
commit | bdf80ac31f407d7886acd6a0fe2fb1ab4c43c6c9 (patch) | |
tree | fad0c4c08c6104fb85cb3539a48cd90ff706d9f8 /src/core/lib/surface | |
parent | 6415043781e0b73008b3db539f18354fe776241a (diff) | |
parent | d6ac251d4be334d7e13cfedb6181c161fdf05c41 (diff) |
Merge branch 'master' of https://github.com/grpc/grpc into gpr_malloc_is_all
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/call.c | 41 | ||||
-rw-r--r-- | src/core/lib/surface/channel.c | 32 | ||||
-rw-r--r-- | src/core/lib/surface/channel.h | 1 | ||||
-rw-r--r-- | src/core/lib/surface/channel_connectivity.c | 207 | ||||
-rw-r--r-- | src/core/lib/surface/channel_init.c | 16 | ||||
-rw-r--r-- | src/core/lib/surface/channel_init.h | 9 | ||||
-rw-r--r-- | src/core/lib/surface/init.c | 69 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 267 |
8 files changed, 228 insertions, 414 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 37cc724b53..6581bbd3d1 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -81,11 +81,11 @@ typedef enum { /* Status came from the application layer overriding whatever the wire says */ STATUS_FROM_API_OVERRIDE = 0, - /* Status was created by some internal channel stack operation */ - STATUS_FROM_CORE, /* Status came from 'the wire' - or somewhere below the surface layer */ STATUS_FROM_WIRE, + /* Status was created by some internal channel stack operation */ + STATUS_FROM_CORE, /* Status came from the server sending status */ STATUS_FROM_SERVER_STATUS, STATUS_SOURCE_COUNT @@ -1074,24 +1074,29 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&call->mu); - grpc_metadata_batch *md = - &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; - grpc_metadata_batch_filter(md, recv_initial_filter, call); - call->has_initial_md_been_received = true; - - if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != - 0 && - !call->is_client) { - GPR_TIMER_BEGIN("set_deadline_alarm", 0); - set_deadline_alarm(exec_ctx, call, md->deadline); - GPR_TIMER_END("set_deadline_alarm", 0); + if (!success) { + bctl->success = false; + } else { + grpc_metadata_batch *md = + &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; + grpc_metadata_batch_filter(md, recv_initial_filter, call); + + if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != + 0 && + !call->is_client) { + GPR_TIMER_BEGIN("set_deadline_alarm", 0); + set_deadline_alarm(exec_ctx, call, md->deadline); + GPR_TIMER_END("set_deadline_alarm", 0); + } } + call->has_initial_md_been_received = true; if (call->saved_receiving_stream_ready_ctx.bctlp != NULL) { grpc_closure *saved_rsr_closure = grpc_closure_create( receiving_stream_ready, call->saved_receiving_stream_ready_ctx.bctlp); - grpc_exec_ctx_enqueue(exec_ctx, saved_rsr_closure, - call->saved_receiving_stream_ready_ctx.success, NULL); + grpc_exec_ctx_enqueue( + exec_ctx, saved_rsr_closure, + call->saved_receiving_stream_ready_ctx.success && success, NULL); call->saved_receiving_stream_ready_ctx.bctlp = NULL; } @@ -1110,6 +1115,9 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { gpr_mu_lock(&call->mu); if (bctl->send_initial_metadata) { + if (!success) { + set_status_code(call, STATUS_FROM_CORE, GRPC_STATUS_UNAVAILABLE); + } grpc_metadata_batch_destroy( &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]); } @@ -1232,8 +1240,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->metadata_batch[0][0].deadline = call->send_deadline; stream_op.send_initial_metadata = &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]; - stream_op.send_idempotent_request = - (op->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) != 0; + stream_op.send_initial_metadata_flags = op->flags; break; case GRPC_OP_SEND_MESSAGE: if (!are_write_flags_valid(op->flags)) { diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index 06f991b085..b6b760b5d8 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -40,7 +40,6 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> -#include "src/core/lib/client_config/resolver_registry.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" @@ -84,14 +83,26 @@ struct grpc_channel { static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, bool success); grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, - const grpc_channel_args *args, + const grpc_channel_args *input_args, grpc_channel_stack_type channel_stack_type, grpc_transport *optional_transport) { bool is_client = grpc_channel_stack_type_is_client(channel_stack_type); - grpc_channel *channel = grpc_channel_init_create_stack( - exec_ctx, channel_stack_type, sizeof(grpc_channel), args, 1, - destroy_channel, NULL, optional_transport); + grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create(); + grpc_channel_stack_builder_set_channel_arguments(builder, input_args); + grpc_channel_stack_builder_set_target(builder, target); + grpc_channel_stack_builder_set_transport(builder, optional_transport); + grpc_channel *channel; + grpc_channel_args *args; + if (!grpc_channel_init_create_stack(exec_ctx, builder, channel_stack_type)) { + grpc_channel_stack_builder_destroy(builder); + return NULL; + } else { + args = grpc_channel_args_copy( + grpc_channel_stack_builder_get_channel_arguments(builder)); + channel = grpc_channel_stack_builder_finish( + exec_ctx, builder, sizeof(grpc_channel), 1, destroy_channel, NULL); + } memset(channel, 0, sizeof(*channel)); channel->target = gpr_strdup(target); @@ -142,16 +153,7 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, } } } - } - - if (channel->is_client && channel->default_authority == NULL && - target != NULL) { - char *default_authority = grpc_get_default_authority(target); - if (default_authority) { - channel->default_authority = - grpc_mdelem_from_strings(":authority", default_authority); - } - gpr_free(default_authority); + grpc_channel_args_destroy(args); } return channel; diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h index 640fd7e137..22dae930e4 100644 --- a/src/core/lib/surface/channel.h +++ b/src/core/lib/surface/channel.h @@ -35,7 +35,6 @@ #define GRPC_CORE_LIB_SURFACE_CHANNEL_H #include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/client_config/subchannel_factory.h" #include "src/core/lib/surface/channel_stack_type.h" grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, diff --git a/src/core/lib/surface/channel_connectivity.c b/src/core/lib/surface/channel_connectivity.c deleted file mode 100644 index 9a9ee422c2..0000000000 --- a/src/core/lib/surface/channel_connectivity.c +++ /dev/null @@ -1,207 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/lib/surface/channel.h" - -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> - -#include "src/core/lib/channel/client_channel.h" -#include "src/core/lib/iomgr/timer.h" -#include "src/core/lib/surface/api_trace.h" -#include "src/core/lib/surface/completion_queue.h" - -grpc_connectivity_state grpc_channel_check_connectivity_state( - grpc_channel *channel, int try_to_connect) { - /* forward through to the underlying client channel */ - grpc_channel_element *client_channel_elem = - grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_connectivity_state state; - GRPC_API_TRACE( - "grpc_channel_check_connectivity_state(channel=%p, try_to_connect=%d)", 2, - (channel, try_to_connect)); - if (client_channel_elem->filter == &grpc_client_channel_filter) { - state = grpc_client_channel_check_connectivity_state( - &exec_ctx, client_channel_elem, try_to_connect); - grpc_exec_ctx_finish(&exec_ctx); - return state; - } - gpr_log(GPR_ERROR, - "grpc_channel_check_connectivity_state called on something that is " - "not a (u)client channel, but '%s'", - client_channel_elem->filter->name); - grpc_exec_ctx_finish(&exec_ctx); - return GRPC_CHANNEL_FATAL_FAILURE; -} - -typedef enum { - WAITING, - CALLING_BACK, - CALLING_BACK_AND_FINISHED, - CALLED_BACK -} callback_phase; - -typedef struct { - gpr_mu mu; - callback_phase phase; - int success; - grpc_closure on_complete; - grpc_timer alarm; - grpc_connectivity_state state; - grpc_completion_queue *cq; - grpc_cq_completion completion_storage; - grpc_channel *channel; - void *tag; -} state_watcher; - -static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) { - grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(w->channel)); - if (client_channel_elem->filter == &grpc_client_channel_filter) { - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel, - "watch_channel_connectivity"); - } else { - abort(); - } - gpr_mu_destroy(&w->mu); - gpr_free(w); -} - -static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw, - grpc_cq_completion *ignored) { - int delete = 0; - state_watcher *w = pw; - gpr_mu_lock(&w->mu); - switch (w->phase) { - case WAITING: - case CALLED_BACK: - GPR_UNREACHABLE_CODE(return ); - case CALLING_BACK: - w->phase = CALLED_BACK; - break; - case CALLING_BACK_AND_FINISHED: - delete = 1; - break; - } - gpr_mu_unlock(&w->mu); - - if (delete) { - delete_state_watcher(exec_ctx, w); - } -} - -static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, - int due_to_completion) { - int delete = 0; - - if (due_to_completion) { - grpc_timer_cancel(exec_ctx, &w->alarm); - } - - gpr_mu_lock(&w->mu); - if (due_to_completion) { - w->success = 1; - } - switch (w->phase) { - case WAITING: - w->phase = CALLING_BACK; - grpc_cq_end_op(exec_ctx, w->cq, w->tag, w->success, finished_completion, - w, &w->completion_storage); - break; - case CALLING_BACK: - w->phase = CALLING_BACK_AND_FINISHED; - break; - case CALLING_BACK_AND_FINISHED: - GPR_UNREACHABLE_CODE(return ); - case CALLED_BACK: - delete = 1; - break; - } - gpr_mu_unlock(&w->mu); - - if (delete) { - delete_state_watcher(exec_ctx, w); - } -} - -static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, bool success) { - partly_done(exec_ctx, pw, 1); -} - -static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, bool success) { - partly_done(exec_ctx, pw, 0); -} - -void grpc_channel_watch_connectivity_state( - grpc_channel *channel, grpc_connectivity_state last_observed_state, - gpr_timespec deadline, grpc_completion_queue *cq, void *tag) { - grpc_channel_element *client_channel_elem = - grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - state_watcher *w = gpr_malloc(sizeof(*w)); - - GRPC_API_TRACE( - "grpc_channel_watch_connectivity_state(" - "channel=%p, last_observed_state=%d, " - "deadline=gpr_timespec { tv_sec: %lld, tv_nsec: %d, clock_type: %d }, " - "cq=%p, tag=%p)", - 7, (channel, (int)last_observed_state, (long long)deadline.tv_sec, - (int)deadline.tv_nsec, (int)deadline.clock_type, cq, tag)); - - grpc_cq_begin_op(cq, tag); - - gpr_mu_init(&w->mu); - grpc_closure_init(&w->on_complete, watch_complete, w); - w->phase = WAITING; - w->state = last_observed_state; - w->success = 0; - w->cq = cq; - w->tag = tag; - w->channel = channel; - - grpc_timer_init(&exec_ctx, &w->alarm, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC)); - - if (client_channel_elem->filter == &grpc_client_channel_filter) { - GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity"); - grpc_client_channel_watch_connectivity_state(&exec_ctx, client_channel_elem, - grpc_cq_pollset(cq), &w->state, - &w->on_complete); - } else { - abort(); - } - - grpc_exec_ctx_finish(&exec_ctx); -} diff --git a/src/core/lib/surface/channel_init.c b/src/core/lib/surface/channel_init.c index fc69f61f77..0627b34479 100644 --- a/src/core/lib/surface/channel_init.c +++ b/src/core/lib/surface/channel_init.c @@ -122,25 +122,19 @@ static const char *name_for_type(grpc_channel_stack_type type) { GPR_UNREACHABLE_CODE(return "UNKNOWN"); } -void *grpc_channel_init_create_stack( - grpc_exec_ctx *exec_ctx, grpc_channel_stack_type type, size_t prefix_bytes, - const grpc_channel_args *args, int initial_refs, grpc_iomgr_cb_func destroy, - void *destroy_arg, grpc_transport *transport) { +bool grpc_channel_init_create_stack(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder, + grpc_channel_stack_type type) { GPR_ASSERT(g_finalized); - grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create(); grpc_channel_stack_builder_set_name(builder, name_for_type(type)); - grpc_channel_stack_builder_set_channel_arguments(builder, args); - grpc_channel_stack_builder_set_transport(builder, transport); for (size_t i = 0; i < g_slots[type].num_slots; i++) { const stage_slot *slot = &g_slots[type].slots[i]; if (!slot->fn(builder, slot->arg)) { - grpc_channel_stack_builder_destroy(builder); - return NULL; + return false; } } - return grpc_channel_stack_builder_finish(exec_ctx, builder, prefix_bytes, - initial_refs, destroy, destroy_arg); + return true; } diff --git a/src/core/lib/surface/channel_init.h b/src/core/lib/surface/channel_init.h index a4d8271ca6..3a18a61ddb 100644 --- a/src/core/lib/surface/channel_init.h +++ b/src/core/lib/surface/channel_init.h @@ -38,6 +38,8 @@ #include "src/core/lib/surface/channel_stack_type.h" #include "src/core/lib/transport/transport.h" +#define GRPC_CHANNEL_INIT_BUILTIN_PRIORITY 10000 + /// This module provides a way for plugins (and the grpc core library itself) /// to register mutators for channel stacks. /// It also provides a universal entry path to run those mutators to build @@ -78,9 +80,8 @@ void grpc_channel_init_shutdown(void); /// \a optional_transport is either NULL or a constructed transport object /// Returns a pointer to the base of the memory allocated (the actual channel /// stack object will be prefix_bytes past that pointer) -void *grpc_channel_init_create_stack( - grpc_exec_ctx *exec_ctx, grpc_channel_stack_type type, size_t prefix_bytes, - const grpc_channel_args *args, int initial_refs, grpc_iomgr_cb_func destroy, - void *destroy_arg, grpc_transport *optional_transport); +bool grpc_channel_init_create_stack(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder, + grpc_channel_stack_type type); #endif /* GRPC_CORE_LIB_SURFACE_CHANNEL_INIT_H */ diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index aca4ce9d07..ec75af6e06 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -39,17 +39,11 @@ #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/time.h> -#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/channel/client_channel.h" #include "src/core/lib/channel/compress_filter.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/channel/http_client_filter.h" #include "src/core/lib/channel/http_server_filter.h" -#include "src/core/lib/client_config/lb_policy_registry.h" -#include "src/core/lib/client_config/resolver_registry.h" -#include "src/core/lib/client_config/subchannel.h" -#include "src/core/lib/client_config/subchannel_index.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" @@ -65,10 +59,6 @@ #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/transport_impl.h" -#ifndef GRPC_DEFAULT_NAME_PREFIX -#define GRPC_DEFAULT_NAME_PREFIX "dns:///" -#endif - /* (generated) built in registry of plugins */ extern void grpc_register_built_in_plugins(void); @@ -105,31 +95,35 @@ static bool maybe_add_http_filter(grpc_channel_stack_builder *builder, } static void register_builtin_channel_init() { - grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, prepend_filter, - (void *)&grpc_compress_filter); - grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX, - prepend_filter, - (void *)&grpc_compress_filter); - grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, prepend_filter, - (void *)&grpc_compress_filter); - grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, INT_MAX, - maybe_add_http_filter, - (void *)&grpc_http_client_filter); - grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, INT_MAX, + grpc_channel_init_register_stage( + GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter, + (void *)&grpc_compress_filter); + grpc_channel_init_register_stage( + GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + prepend_filter, (void *)&grpc_compress_filter); + grpc_channel_init_register_stage( + GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter, + (void *)&grpc_compress_filter); + grpc_channel_init_register_stage( + GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + maybe_add_http_filter, (void *)&grpc_http_client_filter); + grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, + GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, grpc_add_connected_filter, NULL); - grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX, - maybe_add_http_filter, - (void *)&grpc_http_client_filter); - grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX, + grpc_channel_init_register_stage( + GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + maybe_add_http_filter, (void *)&grpc_http_client_filter); + grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, + GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, grpc_add_connected_filter, NULL); - grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, - maybe_add_http_filter, - (void *)&grpc_http_server_filter); - grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, + grpc_channel_init_register_stage( + GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + maybe_add_http_filter, (void *)&grpc_http_server_filter); + grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, + GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, grpc_add_connected_filter, NULL); - grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, append_filter, - (void *)&grpc_client_channel_filter); - grpc_channel_init_register_stage(GRPC_CLIENT_LAME_CHANNEL, INT_MAX, + grpc_channel_init_register_stage(GRPC_CLIENT_LAME_CHANNEL, + GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, append_filter, (void *)&grpc_lame_filter); grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, prepend_filter, (void *)&grpc_server_top_filter); @@ -161,12 +155,8 @@ void grpc_init(void) { gpr_time_init(); grpc_mdctx_global_init(); grpc_channel_init_init(); - grpc_lb_policy_registry_init(); - grpc_resolver_registry_init(GRPC_DEFAULT_NAME_PREFIX); grpc_register_tracer("api", &grpc_api_trace); grpc_register_tracer("channel", &grpc_trace_channel); - grpc_register_tracer("http", &grpc_http_trace); - grpc_register_tracer("flowctl", &grpc_flowctl_trace); grpc_register_tracer("connectivity_state", &grpc_connectivity_state_trace); grpc_register_tracer("channel_stack_builder", &grpc_trace_channel_stack_builder); @@ -176,7 +166,6 @@ void grpc_init(void) { grpc_tracer_init("GRPC_TRACE"); gpr_timers_global_init(); grpc_cq_global_init(); - grpc_subchannel_index_init(); for (i = 0; i < g_number_of_plugins; i++) { if (g_all_of_the_plugins[i].init != NULL) { g_all_of_the_plugins[i].init(); @@ -201,17 +190,13 @@ void grpc_shutdown(void) { grpc_executor_shutdown(); grpc_cq_global_shutdown(); grpc_iomgr_shutdown(); - grpc_subchannel_index_shutdown(); gpr_timers_global_destroy(); grpc_tracer_shutdown(); - grpc_resolver_registry_shutdown(); - grpc_lb_policy_registry_shutdown(); - for (i = 0; i < g_number_of_plugins; i++) { + for (i = g_number_of_plugins; i >= 0; i--) { if (g_all_of_the_plugins[i].destroy != NULL) { g_all_of_the_plugins[i].destroy(); } } - grpc_channel_init_shutdown(); grpc_mdctx_global_shutdown(); } gpr_mu_unlock(&g_init_mu); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 1898bee1c1..ad8ee8c7a9 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -95,7 +95,6 @@ typedef struct requested_call { grpc_byte_buffer **optional_payload; } registered; } data; - grpc_closure publish; } requested_call; typedef struct channel_registered_method { @@ -156,15 +155,21 @@ struct call_data { bool recv_idempotent_request; grpc_metadata_array initial_metadata; + request_matcher *request_matcher; + grpc_byte_buffer *payload; + grpc_closure got_initial_metadata; grpc_closure server_on_recv_initial_metadata; grpc_closure kill_zombie_closure; grpc_closure *on_done_recv_initial_metadata; + grpc_closure publish; + call_data *pending_next; }; struct request_matcher { + grpc_server *server; call_data *pending_head; call_data *pending_tail; gpr_stack_lockfree *requests; @@ -173,6 +178,7 @@ struct request_matcher { struct registered_method { char *method; char *host; + grpc_server_register_method_payload_handling payload_handling; uint32_t flags; request_matcher request_matcher; registered_method *next; @@ -226,8 +232,7 @@ struct grpc_server { #define SERVER_FROM_CALL_ELEM(elem) \ (((channel_data *)(elem)->channel_data)->server) -static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server, - call_data *calld, requested_call *rc); +static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *calld, bool success); static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, requested_call *rc); /* Before calling maybe_finish_shutdown, we must hold mu_global and not @@ -303,8 +308,10 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx, * request_matcher */ -static void request_matcher_init(request_matcher *rm, size_t entries) { +static void request_matcher_init(request_matcher *rm, size_t entries, + grpc_server *server) { memset(rm, 0, sizeof(*rm)); + rm->server = server; rm->requests = gpr_stack_lockfree_create(entries); } @@ -417,21 +424,90 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) { &op); } -static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server, - grpc_call_element *elem, request_matcher *rm) { - call_data *calld = elem->call_data; - int request_id; +static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { + gpr_slice slice = value->slice; + size_t len = GPR_SLICE_LENGTH(slice); - if (gpr_atm_acq_load(&server->shutdown_flag)) { + if (len + 1 > *capacity) { + *capacity = GPR_MAX(len + 1, *capacity * 2); + *dest = gpr_realloc(*dest, *capacity); + } + memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1); +} + +static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, + grpc_cq_completion *c) { + requested_call *rc = req; + grpc_server *server = rc->server; + + if (rc >= server->requested_calls && + rc < server->requested_calls + server->max_requested_calls) { + GPR_ASSERT(rc - server->requested_calls <= INT_MAX); + gpr_stack_lockfree_push(server->request_freelist, + (int)(rc - server->requested_calls)); + } else { + gpr_free(req); + } + + server_unref(exec_ctx, server); +} + +static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, + call_data *calld, requested_call *rc) { + grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call); + grpc_call *call = calld->call; + *rc->call = call; + calld->cq_new = rc->cq_for_notification; + GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata); + switch (rc->type) { + case BATCH_CALL: + GPR_ASSERT(calld->host != NULL); + GPR_ASSERT(calld->path != NULL); + cpstr(&rc->data.batch.details->host, + &rc->data.batch.details->host_capacity, calld->host); + cpstr(&rc->data.batch.details->method, + &rc->data.batch.details->method_capacity, calld->path); + rc->data.batch.details->deadline = calld->deadline; + rc->data.batch.details->flags = + 0 | (calld->recv_idempotent_request + ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST + : 0); + break; + case REGISTERED_CALL: + *rc->data.registered.deadline = calld->deadline; + if (rc->data.registered.optional_payload) { + *rc->data.registered.optional_payload = calld->payload; + } + break; + default: + GPR_UNREACHABLE_CODE(return ); + } + + grpc_call_element *elem = + grpc_call_stack_element(grpc_call_get_call_stack(call), 0); + channel_data *chand = elem->channel_data; + server_ref(chand->server); + grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, true, done_request_event, rc, + &rc->completion); +} + +static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) { + call_data *calld = arg; + request_matcher *rm = calld->request_matcher; + grpc_server *server = rm->server; + + if (!success || gpr_atm_acq_load(&server->shutdown_flag)) { gpr_mu_lock(&calld->mu_state); calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); - grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); + grpc_closure_init( + &calld->kill_zombie_closure, kill_zombie, + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL); return; } - request_id = gpr_stack_lockfree_pop(rm->requests); + int request_id = gpr_stack_lockfree_pop(rm->requests); if (request_id == -1) { gpr_mu_lock(&server->mu_call); gpr_mu_lock(&calld->mu_state); @@ -449,7 +525,41 @@ static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server, gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - begin_call(exec_ctx, server, calld, &server->requested_calls[request_id]); + publish_call(exec_ctx, server, calld, &server->requested_calls[request_id]); + } +} + +static void finish_start_new_rpc( + grpc_exec_ctx *exec_ctx, grpc_server *server, grpc_call_element *elem, + request_matcher *rm, + grpc_server_register_method_payload_handling payload_handling) { + call_data *calld = elem->call_data; + + if (gpr_atm_acq_load(&server->shutdown_flag)) { + gpr_mu_lock(&calld->mu_state); + calld->state = ZOMBIED; + gpr_mu_unlock(&calld->mu_state); + grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); + grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL); + return; + } + + calld->request_matcher = rm; + + switch (payload_handling) { + case GRPC_SRM_PAYLOAD_NONE: + publish_new_rpc(exec_ctx, calld, true); + break; + case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: { + grpc_op op; + memset(&op, 0, sizeof(op)); + op.op = GRPC_OP_RECV_MESSAGE; + op.data.recv_message = &calld->payload; + grpc_closure_init(&calld->publish, publish_new_rpc, calld); + grpc_call_start_batch_and_execute(exec_ctx, calld->call, &op, 1, + &calld->publish); + break; + } } } @@ -475,7 +585,8 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { !calld->recv_idempotent_request) continue; finish_start_new_rpc(exec_ctx, server, elem, - &rm->server_registered_method->request_matcher); + &rm->server_registered_method->request_matcher, + rm->server_registered_method->payload_handling); return; } /* check for a wildcard method definition (no host set) */ @@ -490,12 +601,14 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { !calld->recv_idempotent_request) continue; finish_start_new_rpc(exec_ctx, server, elem, - &rm->server_registered_method->request_matcher); + &rm->server_registered_method->request_matcher, + rm->server_registered_method->payload_handling); return; } } finish_start_new_rpc(exec_ctx, server, elem, - &server->unregistered_request_matcher); + &server->unregistered_request_matcher, + GRPC_SRM_PAYLOAD_NONE); } static int num_listeners(grpc_server *server) { @@ -572,10 +685,14 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; if (md->key == GRPC_MDSTR_PATH) { - calld->path = GRPC_MDSTR_REF(md->value); + if (calld->path == NULL) { + calld->path = GRPC_MDSTR_REF(md->value); + } return NULL; } else if (md->key == GRPC_MDSTR_AUTHORITY) { - calld->host = GRPC_MDSTR_REF(md->value); + if (calld->host == NULL) { + calld->host = GRPC_MDSTR_REF(md->value); + } return NULL; } return md; @@ -824,7 +941,7 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { gpr_stack_lockfree_push(server->request_freelist, (int)i); } request_matcher_init(&server->unregistered_request_matcher, - server->max_requested_calls); + server->max_requested_calls, server); server->requested_calls = gpr_malloc(server->max_requested_calls * sizeof(*server->requested_calls)); @@ -840,8 +957,10 @@ static int streq(const char *a, const char *b) { return 0 == strcmp(a, b); } -void *grpc_server_register_method(grpc_server *server, const char *method, - const char *host, uint32_t flags) { +void *grpc_server_register_method( + grpc_server *server, const char *method, const char *host, + grpc_server_register_method_payload_handling payload_handling, + uint32_t flags) { registered_method *m; GRPC_API_TRACE( "grpc_server_register_method(server=%p, method=%s, host=%s, " @@ -866,10 +985,12 @@ void *grpc_server_register_method(grpc_server *server, const char *method, } m = gpr_malloc(sizeof(registered_method)); memset(m, 0, sizeof(*m)); - request_matcher_init(&m->request_matcher, server->max_requested_calls); + request_matcher_init(&m->request_matcher, server->max_requested_calls, + server); m->method = gpr_strdup(method); m->host = gpr_strdup(host); m->next = server->registered_methods; + m->payload_handling = payload_handling; m->flags = flags; server->registered_methods = m; return m; @@ -1143,8 +1264,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - begin_call(exec_ctx, server, calld, - &server->requested_calls[request_id]); + publish_call(exec_ctx, server, calld, + &server->requested_calls[request_id]); } gpr_mu_lock(&server->mu_call); } @@ -1209,6 +1330,12 @@ grpc_call_error grpc_server_request_registered_call( error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; goto done; } + if ((optional_payload == NULL) != + (rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)) { + gpr_free(rc); + error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH; + goto done; + } grpc_cq_begin_op(cq_for_notification, tag); rc->type = REGISTERED_CALL; rc->server = server; @@ -1226,86 +1353,6 @@ done: return error; } -static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx, - void *user_data, bool success); - -static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { - gpr_slice slice = value->slice; - size_t len = GPR_SLICE_LENGTH(slice); - - if (len + 1 > *capacity) { - *capacity = GPR_MAX(len + 1, *capacity * 2); - *dest = gpr_realloc(*dest, *capacity); - } - memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1); -} - -static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server, - call_data *calld, requested_call *rc) { - grpc_op ops[1]; - grpc_op *op = ops; - - memset(ops, 0, sizeof(ops)); - - /* called once initial metadata has been read by the call, but BEFORE - the ioreq to fetch it out of the call has been executed. - This means metadata related fields can be relied on in calld, but to - fill in the metadata array passed by the client, we need to perform - an ioreq op, that should complete immediately. */ - - grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call); - grpc_closure_init(&rc->publish, publish_registered_or_batch, rc); - *rc->call = calld->call; - calld->cq_new = rc->cq_for_notification; - GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata); - switch (rc->type) { - case BATCH_CALL: - GPR_ASSERT(calld->host != NULL); - GPR_ASSERT(calld->path != NULL); - cpstr(&rc->data.batch.details->host, - &rc->data.batch.details->host_capacity, calld->host); - cpstr(&rc->data.batch.details->method, - &rc->data.batch.details->method_capacity, calld->path); - rc->data.batch.details->deadline = calld->deadline; - rc->data.batch.details->flags = - 0 | (calld->recv_idempotent_request - ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST - : 0); - break; - case REGISTERED_CALL: - *rc->data.registered.deadline = calld->deadline; - if (rc->data.registered.optional_payload) { - op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message = rc->data.registered.optional_payload; - op++; - } - break; - default: - GPR_UNREACHABLE_CODE(return ); - } - - GRPC_CALL_INTERNAL_REF(calld->call, "server"); - grpc_call_start_batch_and_execute(exec_ctx, calld->call, ops, - (size_t)(op - ops), &rc->publish); -} - -static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, - grpc_cq_completion *c) { - requested_call *rc = req; - grpc_server *server = rc->server; - - if (rc >= server->requested_calls && - rc < server->requested_calls + server->max_requested_calls) { - GPR_ASSERT(rc - server->requested_calls <= INT_MAX); - gpr_stack_lockfree_push(server->request_freelist, - (int)(rc - server->requested_calls)); - } else { - gpr_free(req); - } - - server_unref(exec_ctx, server); -} - static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, requested_call *rc) { *rc->call = NULL; @@ -1316,20 +1363,6 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, done_request_event, rc, &rc->completion); } -static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx, void *prc, - bool success) { - requested_call *rc = prc; - grpc_call *call = *rc->call; - grpc_call_element *elem = - grpc_call_stack_element(grpc_call_get_call_stack(call), 0); - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - server_ref(chand->server); - grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, success, done_request_event, - rc, &rc->completion); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "server"); -} - const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { return server->channel_args; } |