aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
authorGravatar Nicolas "Pixel" Noble <pixel@nobis-crew.org>2016-04-08 00:46:15 +0200
committerGravatar Nicolas "Pixel" Noble <pixel@nobis-crew.org>2016-04-08 00:46:15 +0200
commitbdf80ac31f407d7886acd6a0fe2fb1ab4c43c6c9 (patch)
treefad0c4c08c6104fb85cb3539a48cd90ff706d9f8 /src/core/lib/surface
parent6415043781e0b73008b3db539f18354fe776241a (diff)
parentd6ac251d4be334d7e13cfedb6181c161fdf05c41 (diff)
Merge branch 'master' of https://github.com/grpc/grpc into gpr_malloc_is_all
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/call.c41
-rw-r--r--src/core/lib/surface/channel.c32
-rw-r--r--src/core/lib/surface/channel.h1
-rw-r--r--src/core/lib/surface/channel_connectivity.c207
-rw-r--r--src/core/lib/surface/channel_init.c16
-rw-r--r--src/core/lib/surface/channel_init.h9
-rw-r--r--src/core/lib/surface/init.c69
-rw-r--r--src/core/lib/surface/server.c267
8 files changed, 228 insertions, 414 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 37cc724b53..6581bbd3d1 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -81,11 +81,11 @@ typedef enum {
/* Status came from the application layer overriding whatever
the wire says */
STATUS_FROM_API_OVERRIDE = 0,
- /* Status was created by some internal channel stack operation */
- STATUS_FROM_CORE,
/* Status came from 'the wire' - or somewhere below the surface
layer */
STATUS_FROM_WIRE,
+ /* Status was created by some internal channel stack operation */
+ STATUS_FROM_CORE,
/* Status came from the server sending status */
STATUS_FROM_SERVER_STATUS,
STATUS_SOURCE_COUNT
@@ -1074,24 +1074,29 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&call->mu);
- grpc_metadata_batch *md =
- &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
- grpc_metadata_batch_filter(md, recv_initial_filter, call);
- call->has_initial_md_been_received = true;
-
- if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
- 0 &&
- !call->is_client) {
- GPR_TIMER_BEGIN("set_deadline_alarm", 0);
- set_deadline_alarm(exec_ctx, call, md->deadline);
- GPR_TIMER_END("set_deadline_alarm", 0);
+ if (!success) {
+ bctl->success = false;
+ } else {
+ grpc_metadata_batch *md =
+ &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
+ grpc_metadata_batch_filter(md, recv_initial_filter, call);
+
+ if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
+ 0 &&
+ !call->is_client) {
+ GPR_TIMER_BEGIN("set_deadline_alarm", 0);
+ set_deadline_alarm(exec_ctx, call, md->deadline);
+ GPR_TIMER_END("set_deadline_alarm", 0);
+ }
}
+ call->has_initial_md_been_received = true;
if (call->saved_receiving_stream_ready_ctx.bctlp != NULL) {
grpc_closure *saved_rsr_closure = grpc_closure_create(
receiving_stream_ready, call->saved_receiving_stream_ready_ctx.bctlp);
- grpc_exec_ctx_enqueue(exec_ctx, saved_rsr_closure,
- call->saved_receiving_stream_ready_ctx.success, NULL);
+ grpc_exec_ctx_enqueue(
+ exec_ctx, saved_rsr_closure,
+ call->saved_receiving_stream_ready_ctx.success && success, NULL);
call->saved_receiving_stream_ready_ctx.bctlp = NULL;
}
@@ -1110,6 +1115,9 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
gpr_mu_lock(&call->mu);
if (bctl->send_initial_metadata) {
+ if (!success) {
+ set_status_code(call, STATUS_FROM_CORE, GRPC_STATUS_UNAVAILABLE);
+ }
grpc_metadata_batch_destroy(
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
}
@@ -1232,8 +1240,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->metadata_batch[0][0].deadline = call->send_deadline;
stream_op.send_initial_metadata =
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
- stream_op.send_idempotent_request =
- (op->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) != 0;
+ stream_op.send_initial_metadata_flags = op->flags;
break;
case GRPC_OP_SEND_MESSAGE:
if (!are_write_flags_valid(op->flags)) {
diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c
index 06f991b085..b6b760b5d8 100644
--- a/src/core/lib/surface/channel.c
+++ b/src/core/lib/surface/channel.c
@@ -40,7 +40,6 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-#include "src/core/lib/client_config/resolver_registry.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/api_trace.h"
@@ -84,14 +83,26 @@ struct grpc_channel {
static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, bool success);
grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
- const grpc_channel_args *args,
+ const grpc_channel_args *input_args,
grpc_channel_stack_type channel_stack_type,
grpc_transport *optional_transport) {
bool is_client = grpc_channel_stack_type_is_client(channel_stack_type);
- grpc_channel *channel = grpc_channel_init_create_stack(
- exec_ctx, channel_stack_type, sizeof(grpc_channel), args, 1,
- destroy_channel, NULL, optional_transport);
+ grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create();
+ grpc_channel_stack_builder_set_channel_arguments(builder, input_args);
+ grpc_channel_stack_builder_set_target(builder, target);
+ grpc_channel_stack_builder_set_transport(builder, optional_transport);
+ grpc_channel *channel;
+ grpc_channel_args *args;
+ if (!grpc_channel_init_create_stack(exec_ctx, builder, channel_stack_type)) {
+ grpc_channel_stack_builder_destroy(builder);
+ return NULL;
+ } else {
+ args = grpc_channel_args_copy(
+ grpc_channel_stack_builder_get_channel_arguments(builder));
+ channel = grpc_channel_stack_builder_finish(
+ exec_ctx, builder, sizeof(grpc_channel), 1, destroy_channel, NULL);
+ }
memset(channel, 0, sizeof(*channel));
channel->target = gpr_strdup(target);
@@ -142,16 +153,7 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
}
}
}
- }
-
- if (channel->is_client && channel->default_authority == NULL &&
- target != NULL) {
- char *default_authority = grpc_get_default_authority(target);
- if (default_authority) {
- channel->default_authority =
- grpc_mdelem_from_strings(":authority", default_authority);
- }
- gpr_free(default_authority);
+ grpc_channel_args_destroy(args);
}
return channel;
diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h
index 640fd7e137..22dae930e4 100644
--- a/src/core/lib/surface/channel.h
+++ b/src/core/lib/surface/channel.h
@@ -35,7 +35,6 @@
#define GRPC_CORE_LIB_SURFACE_CHANNEL_H
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/client_config/subchannel_factory.h"
#include "src/core/lib/surface/channel_stack_type.h"
grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
diff --git a/src/core/lib/surface/channel_connectivity.c b/src/core/lib/surface/channel_connectivity.c
deleted file mode 100644
index 9a9ee422c2..0000000000
--- a/src/core/lib/surface/channel_connectivity.c
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/lib/surface/channel.h"
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-
-#include "src/core/lib/channel/client_channel.h"
-#include "src/core/lib/iomgr/timer.h"
-#include "src/core/lib/surface/api_trace.h"
-#include "src/core/lib/surface/completion_queue.h"
-
-grpc_connectivity_state grpc_channel_check_connectivity_state(
- grpc_channel *channel, int try_to_connect) {
- /* forward through to the underlying client channel */
- grpc_channel_element *client_channel_elem =
- grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_connectivity_state state;
- GRPC_API_TRACE(
- "grpc_channel_check_connectivity_state(channel=%p, try_to_connect=%d)", 2,
- (channel, try_to_connect));
- if (client_channel_elem->filter == &grpc_client_channel_filter) {
- state = grpc_client_channel_check_connectivity_state(
- &exec_ctx, client_channel_elem, try_to_connect);
- grpc_exec_ctx_finish(&exec_ctx);
- return state;
- }
- gpr_log(GPR_ERROR,
- "grpc_channel_check_connectivity_state called on something that is "
- "not a (u)client channel, but '%s'",
- client_channel_elem->filter->name);
- grpc_exec_ctx_finish(&exec_ctx);
- return GRPC_CHANNEL_FATAL_FAILURE;
-}
-
-typedef enum {
- WAITING,
- CALLING_BACK,
- CALLING_BACK_AND_FINISHED,
- CALLED_BACK
-} callback_phase;
-
-typedef struct {
- gpr_mu mu;
- callback_phase phase;
- int success;
- grpc_closure on_complete;
- grpc_timer alarm;
- grpc_connectivity_state state;
- grpc_completion_queue *cq;
- grpc_cq_completion completion_storage;
- grpc_channel *channel;
- void *tag;
-} state_watcher;
-
-static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) {
- grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
- grpc_channel_get_channel_stack(w->channel));
- if (client_channel_elem->filter == &grpc_client_channel_filter) {
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel,
- "watch_channel_connectivity");
- } else {
- abort();
- }
- gpr_mu_destroy(&w->mu);
- gpr_free(w);
-}
-
-static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
- grpc_cq_completion *ignored) {
- int delete = 0;
- state_watcher *w = pw;
- gpr_mu_lock(&w->mu);
- switch (w->phase) {
- case WAITING:
- case CALLED_BACK:
- GPR_UNREACHABLE_CODE(return );
- case CALLING_BACK:
- w->phase = CALLED_BACK;
- break;
- case CALLING_BACK_AND_FINISHED:
- delete = 1;
- break;
- }
- gpr_mu_unlock(&w->mu);
-
- if (delete) {
- delete_state_watcher(exec_ctx, w);
- }
-}
-
-static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
- int due_to_completion) {
- int delete = 0;
-
- if (due_to_completion) {
- grpc_timer_cancel(exec_ctx, &w->alarm);
- }
-
- gpr_mu_lock(&w->mu);
- if (due_to_completion) {
- w->success = 1;
- }
- switch (w->phase) {
- case WAITING:
- w->phase = CALLING_BACK;
- grpc_cq_end_op(exec_ctx, w->cq, w->tag, w->success, finished_completion,
- w, &w->completion_storage);
- break;
- case CALLING_BACK:
- w->phase = CALLING_BACK_AND_FINISHED;
- break;
- case CALLING_BACK_AND_FINISHED:
- GPR_UNREACHABLE_CODE(return );
- case CALLED_BACK:
- delete = 1;
- break;
- }
- gpr_mu_unlock(&w->mu);
-
- if (delete) {
- delete_state_watcher(exec_ctx, w);
- }
-}
-
-static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, bool success) {
- partly_done(exec_ctx, pw, 1);
-}
-
-static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, bool success) {
- partly_done(exec_ctx, pw, 0);
-}
-
-void grpc_channel_watch_connectivity_state(
- grpc_channel *channel, grpc_connectivity_state last_observed_state,
- gpr_timespec deadline, grpc_completion_queue *cq, void *tag) {
- grpc_channel_element *client_channel_elem =
- grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- state_watcher *w = gpr_malloc(sizeof(*w));
-
- GRPC_API_TRACE(
- "grpc_channel_watch_connectivity_state("
- "channel=%p, last_observed_state=%d, "
- "deadline=gpr_timespec { tv_sec: %lld, tv_nsec: %d, clock_type: %d }, "
- "cq=%p, tag=%p)",
- 7, (channel, (int)last_observed_state, (long long)deadline.tv_sec,
- (int)deadline.tv_nsec, (int)deadline.clock_type, cq, tag));
-
- grpc_cq_begin_op(cq, tag);
-
- gpr_mu_init(&w->mu);
- grpc_closure_init(&w->on_complete, watch_complete, w);
- w->phase = WAITING;
- w->state = last_observed_state;
- w->success = 0;
- w->cq = cq;
- w->tag = tag;
- w->channel = channel;
-
- grpc_timer_init(&exec_ctx, &w->alarm,
- gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
- timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC));
-
- if (client_channel_elem->filter == &grpc_client_channel_filter) {
- GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");
- grpc_client_channel_watch_connectivity_state(&exec_ctx, client_channel_elem,
- grpc_cq_pollset(cq), &w->state,
- &w->on_complete);
- } else {
- abort();
- }
-
- grpc_exec_ctx_finish(&exec_ctx);
-}
diff --git a/src/core/lib/surface/channel_init.c b/src/core/lib/surface/channel_init.c
index fc69f61f77..0627b34479 100644
--- a/src/core/lib/surface/channel_init.c
+++ b/src/core/lib/surface/channel_init.c
@@ -122,25 +122,19 @@ static const char *name_for_type(grpc_channel_stack_type type) {
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
-void *grpc_channel_init_create_stack(
- grpc_exec_ctx *exec_ctx, grpc_channel_stack_type type, size_t prefix_bytes,
- const grpc_channel_args *args, int initial_refs, grpc_iomgr_cb_func destroy,
- void *destroy_arg, grpc_transport *transport) {
+bool grpc_channel_init_create_stack(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack_builder *builder,
+ grpc_channel_stack_type type) {
GPR_ASSERT(g_finalized);
- grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create();
grpc_channel_stack_builder_set_name(builder, name_for_type(type));
- grpc_channel_stack_builder_set_channel_arguments(builder, args);
- grpc_channel_stack_builder_set_transport(builder, transport);
for (size_t i = 0; i < g_slots[type].num_slots; i++) {
const stage_slot *slot = &g_slots[type].slots[i];
if (!slot->fn(builder, slot->arg)) {
- grpc_channel_stack_builder_destroy(builder);
- return NULL;
+ return false;
}
}
- return grpc_channel_stack_builder_finish(exec_ctx, builder, prefix_bytes,
- initial_refs, destroy, destroy_arg);
+ return true;
}
diff --git a/src/core/lib/surface/channel_init.h b/src/core/lib/surface/channel_init.h
index a4d8271ca6..3a18a61ddb 100644
--- a/src/core/lib/surface/channel_init.h
+++ b/src/core/lib/surface/channel_init.h
@@ -38,6 +38,8 @@
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/transport.h"
+#define GRPC_CHANNEL_INIT_BUILTIN_PRIORITY 10000
+
/// This module provides a way for plugins (and the grpc core library itself)
/// to register mutators for channel stacks.
/// It also provides a universal entry path to run those mutators to build
@@ -78,9 +80,8 @@ void grpc_channel_init_shutdown(void);
/// \a optional_transport is either NULL or a constructed transport object
/// Returns a pointer to the base of the memory allocated (the actual channel
/// stack object will be prefix_bytes past that pointer)
-void *grpc_channel_init_create_stack(
- grpc_exec_ctx *exec_ctx, grpc_channel_stack_type type, size_t prefix_bytes,
- const grpc_channel_args *args, int initial_refs, grpc_iomgr_cb_func destroy,
- void *destroy_arg, grpc_transport *optional_transport);
+bool grpc_channel_init_create_stack(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack_builder *builder,
+ grpc_channel_stack_type type);
#endif /* GRPC_CORE_LIB_SURFACE_CHANNEL_INIT_H */
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index aca4ce9d07..ec75af6e06 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -39,17 +39,11 @@
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/time.h>
-#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/channel/client_channel.h"
#include "src/core/lib/channel/compress_filter.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/channel/http_client_filter.h"
#include "src/core/lib/channel/http_server_filter.h"
-#include "src/core/lib/client_config/lb_policy_registry.h"
-#include "src/core/lib/client_config/resolver_registry.h"
-#include "src/core/lib/client_config/subchannel.h"
-#include "src/core/lib/client_config/subchannel_index.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
@@ -65,10 +59,6 @@
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/transport_impl.h"
-#ifndef GRPC_DEFAULT_NAME_PREFIX
-#define GRPC_DEFAULT_NAME_PREFIX "dns:///"
-#endif
-
/* (generated) built in registry of plugins */
extern void grpc_register_built_in_plugins(void);
@@ -105,31 +95,35 @@ static bool maybe_add_http_filter(grpc_channel_stack_builder *builder,
}
static void register_builtin_channel_init() {
- grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, prepend_filter,
- (void *)&grpc_compress_filter);
- grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX,
- prepend_filter,
- (void *)&grpc_compress_filter);
- grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, prepend_filter,
- (void *)&grpc_compress_filter);
- grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, INT_MAX,
- maybe_add_http_filter,
- (void *)&grpc_http_client_filter);
- grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, INT_MAX,
+ grpc_channel_init_register_stage(
+ GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
+ (void *)&grpc_compress_filter);
+ grpc_channel_init_register_stage(
+ GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ prepend_filter, (void *)&grpc_compress_filter);
+ grpc_channel_init_register_stage(
+ GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
+ (void *)&grpc_compress_filter);
+ grpc_channel_init_register_stage(
+ GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_http_filter, (void *)&grpc_http_client_filter);
+ grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
grpc_add_connected_filter, NULL);
- grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX,
- maybe_add_http_filter,
- (void *)&grpc_http_client_filter);
- grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX,
+ grpc_channel_init_register_stage(
+ GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_http_filter, (void *)&grpc_http_client_filter);
+ grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL,
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
grpc_add_connected_filter, NULL);
- grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX,
- maybe_add_http_filter,
- (void *)&grpc_http_server_filter);
- grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX,
+ grpc_channel_init_register_stage(
+ GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_http_filter, (void *)&grpc_http_server_filter);
+ grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
grpc_add_connected_filter, NULL);
- grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, append_filter,
- (void *)&grpc_client_channel_filter);
- grpc_channel_init_register_stage(GRPC_CLIENT_LAME_CHANNEL, INT_MAX,
+ grpc_channel_init_register_stage(GRPC_CLIENT_LAME_CHANNEL,
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
append_filter, (void *)&grpc_lame_filter);
grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, prepend_filter,
(void *)&grpc_server_top_filter);
@@ -161,12 +155,8 @@ void grpc_init(void) {
gpr_time_init();
grpc_mdctx_global_init();
grpc_channel_init_init();
- grpc_lb_policy_registry_init();
- grpc_resolver_registry_init(GRPC_DEFAULT_NAME_PREFIX);
grpc_register_tracer("api", &grpc_api_trace);
grpc_register_tracer("channel", &grpc_trace_channel);
- grpc_register_tracer("http", &grpc_http_trace);
- grpc_register_tracer("flowctl", &grpc_flowctl_trace);
grpc_register_tracer("connectivity_state", &grpc_connectivity_state_trace);
grpc_register_tracer("channel_stack_builder",
&grpc_trace_channel_stack_builder);
@@ -176,7 +166,6 @@ void grpc_init(void) {
grpc_tracer_init("GRPC_TRACE");
gpr_timers_global_init();
grpc_cq_global_init();
- grpc_subchannel_index_init();
for (i = 0; i < g_number_of_plugins; i++) {
if (g_all_of_the_plugins[i].init != NULL) {
g_all_of_the_plugins[i].init();
@@ -201,17 +190,13 @@ void grpc_shutdown(void) {
grpc_executor_shutdown();
grpc_cq_global_shutdown();
grpc_iomgr_shutdown();
- grpc_subchannel_index_shutdown();
gpr_timers_global_destroy();
grpc_tracer_shutdown();
- grpc_resolver_registry_shutdown();
- grpc_lb_policy_registry_shutdown();
- for (i = 0; i < g_number_of_plugins; i++) {
+ for (i = g_number_of_plugins; i >= 0; i--) {
if (g_all_of_the_plugins[i].destroy != NULL) {
g_all_of_the_plugins[i].destroy();
}
}
- grpc_channel_init_shutdown();
grpc_mdctx_global_shutdown();
}
gpr_mu_unlock(&g_init_mu);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 1898bee1c1..ad8ee8c7a9 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -95,7 +95,6 @@ typedef struct requested_call {
grpc_byte_buffer **optional_payload;
} registered;
} data;
- grpc_closure publish;
} requested_call;
typedef struct channel_registered_method {
@@ -156,15 +155,21 @@ struct call_data {
bool recv_idempotent_request;
grpc_metadata_array initial_metadata;
+ request_matcher *request_matcher;
+ grpc_byte_buffer *payload;
+
grpc_closure got_initial_metadata;
grpc_closure server_on_recv_initial_metadata;
grpc_closure kill_zombie_closure;
grpc_closure *on_done_recv_initial_metadata;
+ grpc_closure publish;
+
call_data *pending_next;
};
struct request_matcher {
+ grpc_server *server;
call_data *pending_head;
call_data *pending_tail;
gpr_stack_lockfree *requests;
@@ -173,6 +178,7 @@ struct request_matcher {
struct registered_method {
char *method;
char *host;
+ grpc_server_register_method_payload_handling payload_handling;
uint32_t flags;
request_matcher request_matcher;
registered_method *next;
@@ -226,8 +232,7 @@ struct grpc_server {
#define SERVER_FROM_CALL_ELEM(elem) \
(((channel_data *)(elem)->channel_data)->server)
-static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
- call_data *calld, requested_call *rc);
+static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *calld, bool success);
static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
requested_call *rc);
/* Before calling maybe_finish_shutdown, we must hold mu_global and not
@@ -303,8 +308,10 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
* request_matcher
*/
-static void request_matcher_init(request_matcher *rm, size_t entries) {
+static void request_matcher_init(request_matcher *rm, size_t entries,
+ grpc_server *server) {
memset(rm, 0, sizeof(*rm));
+ rm->server = server;
rm->requests = gpr_stack_lockfree_create(entries);
}
@@ -417,21 +424,90 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) {
&op);
}
-static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
- grpc_call_element *elem, request_matcher *rm) {
- call_data *calld = elem->call_data;
- int request_id;
+static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
+ gpr_slice slice = value->slice;
+ size_t len = GPR_SLICE_LENGTH(slice);
- if (gpr_atm_acq_load(&server->shutdown_flag)) {
+ if (len + 1 > *capacity) {
+ *capacity = GPR_MAX(len + 1, *capacity * 2);
+ *dest = gpr_realloc(*dest, *capacity);
+ }
+ memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
+}
+
+static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
+ grpc_cq_completion *c) {
+ requested_call *rc = req;
+ grpc_server *server = rc->server;
+
+ if (rc >= server->requested_calls &&
+ rc < server->requested_calls + server->max_requested_calls) {
+ GPR_ASSERT(rc - server->requested_calls <= INT_MAX);
+ gpr_stack_lockfree_push(server->request_freelist,
+ (int)(rc - server->requested_calls));
+ } else {
+ gpr_free(req);
+ }
+
+ server_unref(exec_ctx, server);
+}
+
+static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
+ call_data *calld, requested_call *rc) {
+ grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call);
+ grpc_call *call = calld->call;
+ *rc->call = call;
+ calld->cq_new = rc->cq_for_notification;
+ GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
+ switch (rc->type) {
+ case BATCH_CALL:
+ GPR_ASSERT(calld->host != NULL);
+ GPR_ASSERT(calld->path != NULL);
+ cpstr(&rc->data.batch.details->host,
+ &rc->data.batch.details->host_capacity, calld->host);
+ cpstr(&rc->data.batch.details->method,
+ &rc->data.batch.details->method_capacity, calld->path);
+ rc->data.batch.details->deadline = calld->deadline;
+ rc->data.batch.details->flags =
+ 0 | (calld->recv_idempotent_request
+ ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
+ : 0);
+ break;
+ case REGISTERED_CALL:
+ *rc->data.registered.deadline = calld->deadline;
+ if (rc->data.registered.optional_payload) {
+ *rc->data.registered.optional_payload = calld->payload;
+ }
+ break;
+ default:
+ GPR_UNREACHABLE_CODE(return );
+ }
+
+ grpc_call_element *elem =
+ grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
+ channel_data *chand = elem->channel_data;
+ server_ref(chand->server);
+ grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, true, done_request_event, rc,
+ &rc->completion);
+}
+
+static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+ call_data *calld = arg;
+ request_matcher *rm = calld->request_matcher;
+ grpc_server *server = rm->server;
+
+ if (!success || gpr_atm_acq_load(&server->shutdown_flag)) {
gpr_mu_lock(&calld->mu_state);
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
- grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
+ grpc_closure_init(
+ &calld->kill_zombie_closure, kill_zombie,
+ grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL);
return;
}
- request_id = gpr_stack_lockfree_pop(rm->requests);
+ int request_id = gpr_stack_lockfree_pop(rm->requests);
if (request_id == -1) {
gpr_mu_lock(&server->mu_call);
gpr_mu_lock(&calld->mu_state);
@@ -449,7 +525,41 @@ static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
- begin_call(exec_ctx, server, calld, &server->requested_calls[request_id]);
+ publish_call(exec_ctx, server, calld, &server->requested_calls[request_id]);
+ }
+}
+
+static void finish_start_new_rpc(
+ grpc_exec_ctx *exec_ctx, grpc_server *server, grpc_call_element *elem,
+ request_matcher *rm,
+ grpc_server_register_method_payload_handling payload_handling) {
+ call_data *calld = elem->call_data;
+
+ if (gpr_atm_acq_load(&server->shutdown_flag)) {
+ gpr_mu_lock(&calld->mu_state);
+ calld->state = ZOMBIED;
+ gpr_mu_unlock(&calld->mu_state);
+ grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
+ grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL);
+ return;
+ }
+
+ calld->request_matcher = rm;
+
+ switch (payload_handling) {
+ case GRPC_SRM_PAYLOAD_NONE:
+ publish_new_rpc(exec_ctx, calld, true);
+ break;
+ case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: {
+ grpc_op op;
+ memset(&op, 0, sizeof(op));
+ op.op = GRPC_OP_RECV_MESSAGE;
+ op.data.recv_message = &calld->payload;
+ grpc_closure_init(&calld->publish, publish_new_rpc, calld);
+ grpc_call_start_batch_and_execute(exec_ctx, calld->call, &op, 1,
+ &calld->publish);
+ break;
+ }
}
}
@@ -475,7 +585,8 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
!calld->recv_idempotent_request)
continue;
finish_start_new_rpc(exec_ctx, server, elem,
- &rm->server_registered_method->request_matcher);
+ &rm->server_registered_method->request_matcher,
+ rm->server_registered_method->payload_handling);
return;
}
/* check for a wildcard method definition (no host set) */
@@ -490,12 +601,14 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
!calld->recv_idempotent_request)
continue;
finish_start_new_rpc(exec_ctx, server, elem,
- &rm->server_registered_method->request_matcher);
+ &rm->server_registered_method->request_matcher,
+ rm->server_registered_method->payload_handling);
return;
}
}
finish_start_new_rpc(exec_ctx, server, elem,
- &server->unregistered_request_matcher);
+ &server->unregistered_request_matcher,
+ GRPC_SRM_PAYLOAD_NONE);
}
static int num_listeners(grpc_server *server) {
@@ -572,10 +685,14 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
if (md->key == GRPC_MDSTR_PATH) {
- calld->path = GRPC_MDSTR_REF(md->value);
+ if (calld->path == NULL) {
+ calld->path = GRPC_MDSTR_REF(md->value);
+ }
return NULL;
} else if (md->key == GRPC_MDSTR_AUTHORITY) {
- calld->host = GRPC_MDSTR_REF(md->value);
+ if (calld->host == NULL) {
+ calld->host = GRPC_MDSTR_REF(md->value);
+ }
return NULL;
}
return md;
@@ -824,7 +941,7 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
gpr_stack_lockfree_push(server->request_freelist, (int)i);
}
request_matcher_init(&server->unregistered_request_matcher,
- server->max_requested_calls);
+ server->max_requested_calls, server);
server->requested_calls = gpr_malloc(server->max_requested_calls *
sizeof(*server->requested_calls));
@@ -840,8 +957,10 @@ static int streq(const char *a, const char *b) {
return 0 == strcmp(a, b);
}
-void *grpc_server_register_method(grpc_server *server, const char *method,
- const char *host, uint32_t flags) {
+void *grpc_server_register_method(
+ grpc_server *server, const char *method, const char *host,
+ grpc_server_register_method_payload_handling payload_handling,
+ uint32_t flags) {
registered_method *m;
GRPC_API_TRACE(
"grpc_server_register_method(server=%p, method=%s, host=%s, "
@@ -866,10 +985,12 @@ void *grpc_server_register_method(grpc_server *server, const char *method,
}
m = gpr_malloc(sizeof(registered_method));
memset(m, 0, sizeof(*m));
- request_matcher_init(&m->request_matcher, server->max_requested_calls);
+ request_matcher_init(&m->request_matcher, server->max_requested_calls,
+ server);
m->method = gpr_strdup(method);
m->host = gpr_strdup(host);
m->next = server->registered_methods;
+ m->payload_handling = payload_handling;
m->flags = flags;
server->registered_methods = m;
return m;
@@ -1143,8 +1264,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
- begin_call(exec_ctx, server, calld,
- &server->requested_calls[request_id]);
+ publish_call(exec_ctx, server, calld,
+ &server->requested_calls[request_id]);
}
gpr_mu_lock(&server->mu_call);
}
@@ -1209,6 +1330,12 @@ grpc_call_error grpc_server_request_registered_call(
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
}
+ if ((optional_payload == NULL) !=
+ (rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)) {
+ gpr_free(rc);
+ error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
+ goto done;
+ }
grpc_cq_begin_op(cq_for_notification, tag);
rc->type = REGISTERED_CALL;
rc->server = server;
@@ -1226,86 +1353,6 @@ done:
return error;
}
-static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx,
- void *user_data, bool success);
-
-static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
- gpr_slice slice = value->slice;
- size_t len = GPR_SLICE_LENGTH(slice);
-
- if (len + 1 > *capacity) {
- *capacity = GPR_MAX(len + 1, *capacity * 2);
- *dest = gpr_realloc(*dest, *capacity);
- }
- memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
-}
-
-static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
- call_data *calld, requested_call *rc) {
- grpc_op ops[1];
- grpc_op *op = ops;
-
- memset(ops, 0, sizeof(ops));
-
- /* called once initial metadata has been read by the call, but BEFORE
- the ioreq to fetch it out of the call has been executed.
- This means metadata related fields can be relied on in calld, but to
- fill in the metadata array passed by the client, we need to perform
- an ioreq op, that should complete immediately. */
-
- grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call);
- grpc_closure_init(&rc->publish, publish_registered_or_batch, rc);
- *rc->call = calld->call;
- calld->cq_new = rc->cq_for_notification;
- GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
- switch (rc->type) {
- case BATCH_CALL:
- GPR_ASSERT(calld->host != NULL);
- GPR_ASSERT(calld->path != NULL);
- cpstr(&rc->data.batch.details->host,
- &rc->data.batch.details->host_capacity, calld->host);
- cpstr(&rc->data.batch.details->method,
- &rc->data.batch.details->method_capacity, calld->path);
- rc->data.batch.details->deadline = calld->deadline;
- rc->data.batch.details->flags =
- 0 | (calld->recv_idempotent_request
- ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
- : 0);
- break;
- case REGISTERED_CALL:
- *rc->data.registered.deadline = calld->deadline;
- if (rc->data.registered.optional_payload) {
- op->op = GRPC_OP_RECV_MESSAGE;
- op->data.recv_message = rc->data.registered.optional_payload;
- op++;
- }
- break;
- default:
- GPR_UNREACHABLE_CODE(return );
- }
-
- GRPC_CALL_INTERNAL_REF(calld->call, "server");
- grpc_call_start_batch_and_execute(exec_ctx, calld->call, ops,
- (size_t)(op - ops), &rc->publish);
-}
-
-static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
- grpc_cq_completion *c) {
- requested_call *rc = req;
- grpc_server *server = rc->server;
-
- if (rc >= server->requested_calls &&
- rc < server->requested_calls + server->max_requested_calls) {
- GPR_ASSERT(rc - server->requested_calls <= INT_MAX);
- gpr_stack_lockfree_push(server->request_freelist,
- (int)(rc - server->requested_calls));
- } else {
- gpr_free(req);
- }
-
- server_unref(exec_ctx, server);
-}
-
static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
requested_call *rc) {
*rc->call = NULL;
@@ -1316,20 +1363,6 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
done_request_event, rc, &rc->completion);
}
-static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx, void *prc,
- bool success) {
- requested_call *rc = prc;
- grpc_call *call = *rc->call;
- grpc_call_element *elem =
- grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- server_ref(chand->server);
- grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, success, done_request_event,
- rc, &rc->completion);
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "server");
-}
-
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
return server->channel_args;
}