aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-04-14 13:15:56 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-04-14 13:15:56 -0700
commitc3666d70902ff7b9c1735761d7156232e84ef4e4 (patch)
tree9ffc8ca7d7f9fac109b49512cf783a75a27244d1 /src/core/lib/surface
parent57e6485e2d104a8af1aca76314d188471252761d (diff)
parente412a180602753972ac496560322e224a5db987f (diff)
Merge github.com:grpc/grpc into hybrid
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/channel_init.c21
-rw-r--r--src/core/lib/surface/channel_stack_type.c18
-rw-r--r--src/core/lib/surface/channel_stack_type.h2
-rw-r--r--src/core/lib/surface/init.c50
-rw-r--r--src/core/lib/surface/server.c41
5 files changed, 57 insertions, 75 deletions
diff --git a/src/core/lib/surface/channel_init.c b/src/core/lib/surface/channel_init.c
index 7acb444d9b..20f5753004 100644
--- a/src/core/lib/surface/channel_init.c
+++ b/src/core/lib/surface/channel_init.c
@@ -104,30 +104,13 @@ void grpc_channel_init_shutdown(void) {
}
}
-static const char *name_for_type(grpc_channel_stack_type type) {
- switch (type) {
- case GRPC_CLIENT_CHANNEL:
- return "CLIENT_CHANNEL";
- case GRPC_CLIENT_SUBCHANNEL:
- return "CLIENT_SUBCHANNEL";
- case GRPC_SERVER_CHANNEL:
- return "SERVER_CHANNEL";
- case GRPC_CLIENT_LAME_CHANNEL:
- return "CLIENT_LAME_CHANNEL";
- case GRPC_CLIENT_DIRECT_CHANNEL:
- return "CLIENT_DIRECT_CHANNEL";
- case GRPC_NUM_CHANNEL_STACK_TYPES:
- break;
- }
- GPR_UNREACHABLE_CODE(return "UNKNOWN");
-}
-
bool grpc_channel_init_create_stack(grpc_exec_ctx *exec_ctx,
grpc_channel_stack_builder *builder,
grpc_channel_stack_type type) {
GPR_ASSERT(g_finalized);
- grpc_channel_stack_builder_set_name(builder, name_for_type(type));
+ grpc_channel_stack_builder_set_name(builder,
+ grpc_channel_stack_type_string(type));
for (size_t i = 0; i < g_slots[type].num_slots; i++) {
const stage_slot *slot = &g_slots[type].slots[i];
diff --git a/src/core/lib/surface/channel_stack_type.c b/src/core/lib/surface/channel_stack_type.c
index c35d603ca3..ed3b53fb36 100644
--- a/src/core/lib/surface/channel_stack_type.c
+++ b/src/core/lib/surface/channel_stack_type.c
@@ -52,3 +52,21 @@ bool grpc_channel_stack_type_is_client(grpc_channel_stack_type type) {
}
GPR_UNREACHABLE_CODE(return true;);
}
+
+const char *grpc_channel_stack_type_string(grpc_channel_stack_type type) {
+ switch (type) {
+ case GRPC_CLIENT_CHANNEL:
+ return "CLIENT_CHANNEL";
+ case GRPC_CLIENT_SUBCHANNEL:
+ return "CLIENT_SUBCHANNEL";
+ case GRPC_SERVER_CHANNEL:
+ return "SERVER_CHANNEL";
+ case GRPC_CLIENT_LAME_CHANNEL:
+ return "CLIENT_LAME_CHANNEL";
+ case GRPC_CLIENT_DIRECT_CHANNEL:
+ return "CLIENT_DIRECT_CHANNEL";
+ case GRPC_NUM_CHANNEL_STACK_TYPES:
+ break;
+ }
+ GPR_UNREACHABLE_CODE(return "UNKNOWN");
+}
diff --git a/src/core/lib/surface/channel_stack_type.h b/src/core/lib/surface/channel_stack_type.h
index 4eea4f1b01..ccf4e53d27 100644
--- a/src/core/lib/surface/channel_stack_type.h
+++ b/src/core/lib/surface/channel_stack_type.h
@@ -55,4 +55,6 @@ typedef enum {
bool grpc_channel_stack_type_is_client(grpc_channel_stack_type type);
+const char *grpc_channel_stack_type_string(grpc_channel_stack_type type);
+
#endif /* GRPC_CORE_LIB_SURFACE_CHANNEL_STACK_TYPE_H */
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 91bd014a0e..4b381b1954 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -41,13 +41,8 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/channel/compress_filter.h"
#include "src/core/lib/channel/connected_channel.h"
-#include "src/core/lib/channel/deadline_filter.h"
#include "src/core/lib/channel/handshaker_registry.h"
-#include "src/core/lib/channel/http_client_filter.h"
-#include "src/core/lib/channel/http_server_filter.h"
-#include "src/core/lib/channel/message_size_filter.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/combiner.h"
@@ -95,57 +90,13 @@ static bool prepend_filter(grpc_exec_ctx *exec_ctx,
builder, (const grpc_channel_filter *)arg, NULL, NULL);
}
-static bool maybe_add_http_filter(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack_builder *builder,
- void *arg) {
- grpc_transport *t = grpc_channel_stack_builder_get_transport(builder);
- if (t && strstr(t->vtable->name, "http")) {
- return grpc_channel_stack_builder_prepend_filter(
- builder, (const grpc_channel_filter *)arg, NULL, NULL);
- }
- return true;
-}
-
static void register_builtin_channel_init() {
- grpc_channel_init_register_stage(
- GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- prepend_filter, (void *)&grpc_client_deadline_filter);
- grpc_channel_init_register_stage(
- GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
- (void *)&grpc_server_deadline_filter);
- grpc_channel_init_register_stage(
- GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- prepend_filter, (void *)&grpc_message_size_filter);
- grpc_channel_init_register_stage(
- GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- prepend_filter, (void *)&grpc_message_size_filter);
- grpc_channel_init_register_stage(
- GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
- (void *)&grpc_message_size_filter);
- grpc_channel_init_register_stage(
- GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
- (void *)&grpc_compress_filter);
- grpc_channel_init_register_stage(
- GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- prepend_filter, (void *)&grpc_compress_filter);
- grpc_channel_init_register_stage(
- GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
- (void *)&grpc_compress_filter);
- grpc_channel_init_register_stage(
- GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- maybe_add_http_filter, (void *)&grpc_http_client_filter);
grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
grpc_add_connected_filter, NULL);
- grpc_channel_init_register_stage(
- GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- maybe_add_http_filter, (void *)&grpc_http_client_filter);
grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
grpc_add_connected_filter, NULL);
- grpc_channel_init_register_stage(
- GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- maybe_add_http_filter, (void *)&grpc_http_server_filter);
grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
grpc_add_connected_filter, NULL);
@@ -189,7 +140,6 @@ void grpc_init(void) {
grpc_register_tracer("channel_stack_builder",
&grpc_trace_channel_stack_builder);
grpc_register_tracer("http1", &grpc_http1_trace);
- grpc_register_tracer("compression", &grpc_compression_trace);
grpc_register_tracer("queue_pluck", &grpc_cq_pluck_trace);
grpc_register_tracer("combiner", &grpc_combiner_trace);
grpc_register_tracer("server_channel", &grpc_server_channel_trace);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index da8b6339b2..97d671b1b6 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -44,6 +44,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/support/stack_lockfree.h"
@@ -211,6 +212,11 @@ struct grpc_server {
gpr_mu mu_global; /* mutex for server and channel state */
gpr_mu mu_call; /* mutex for call-specific state */
+ /* startup synchronization: flag is protected by mu_global, signals whether
+ we are doing the listener start routine or not */
+ bool starting;
+ gpr_cv starting_cv;
+
registered_method *registered_methods;
/** one request matcher for unregistered methods */
request_matcher unregistered_request_matcher;
@@ -388,6 +394,7 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
grpc_channel_args_destroy(exec_ctx, server->channel_args);
gpr_mu_destroy(&server->mu_global);
gpr_mu_destroy(&server->mu_call);
+ gpr_cv_destroy(&server->starting_cv);
while ((rm = server->registered_methods) != NULL) {
server->registered_methods = rm->next;
if (server->started) {
@@ -1017,6 +1024,7 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
gpr_mu_init(&server->mu_global);
gpr_mu_init(&server->mu_call);
+ gpr_cv_init(&server->starting_cv);
/* decremented by grpc_server_destroy */
gpr_ref_init(&server->internal_refcount, 1);
@@ -1073,8 +1081,22 @@ void *grpc_server_register_method(
return m;
}
+static void start_listeners(grpc_exec_ctx *exec_ctx, void *s,
+ grpc_error *error) {
+ grpc_server *server = s;
+ for (listener *l = server->listeners; l; l = l->next) {
+ l->start(exec_ctx, server, l->arg, server->pollsets, server->pollset_count);
+ }
+
+ gpr_mu_lock(&server->mu_global);
+ server->starting = false;
+ gpr_cv_signal(&server->starting_cv);
+ gpr_mu_unlock(&server->mu_global);
+
+ server_unref(exec_ctx, server);
+}
+
void grpc_server_start(grpc_server *server) {
- listener *l;
size_t i;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -1108,10 +1130,11 @@ void grpc_server_start(grpc_server *server) {
(size_t)server->max_requested_calls_per_cq, server);
}
- for (l = server->listeners; l; l = l->next) {
- l->start(&exec_ctx, server, l->arg, server->pollsets,
- server->pollset_count);
- }
+ server_ref(server);
+ server->starting = true;
+ grpc_closure_sched(&exec_ctx, grpc_closure_create(start_listeners, server,
+ grpc_executor_scheduler),
+ GRPC_ERROR_NONE);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -1245,8 +1268,14 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,
(server, cq, tag));
- /* lock, and gather up some stuff to do */
+ /* wait for startup to be finished: locks mu_global */
gpr_mu_lock(&server->mu_global);
+ while (server->starting) {
+ gpr_cv_wait(&server->starting_cv, &server->mu_global,
+ gpr_inf_future(GPR_CLOCK_REALTIME));
+ }
+
+ /* stay locked, and gather up some stuff to do */
grpc_cq_begin_op(cq, tag);
if (server->shutdown_published) {
grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown,