aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-04-14 15:18:20 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-04-14 15:18:20 -0700
commit56c23f11b04192a977fdd9a4cbb21259f4a87794 (patch)
tree3cea7a6a830a1a95f318b678da52d1b490da23b6 /src/core/lib/surface
parented88abcad8f2857701d2a14f21c92a9e9d4dbc0d (diff)
parent88ce393687d3c471c879734aeb2015fb54394eeb (diff)
Merge github.com:grpc/grpc into c++lame
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/call.c44
-rw-r--r--src/core/lib/surface/channel_init.c21
-rw-r--r--src/core/lib/surface/channel_stack_type.c18
-rw-r--r--src/core/lib/surface/channel_stack_type.h2
-rw-r--r--src/core/lib/surface/completion_queue.c48
-rw-r--r--src/core/lib/surface/completion_queue.h6
-rw-r--r--src/core/lib/surface/completion_queue_factory.c33
-rw-r--r--src/core/lib/surface/init.c50
-rw-r--r--src/core/lib/surface/server.c50
-rw-r--r--src/core/lib/surface/version.c2
10 files changed, 176 insertions, 98 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 97d50a91be..3e96d09798 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -1187,6 +1187,7 @@ static void finish_batch_step(grpc_exec_ctx *exec_ctx, batch_control *bctl) {
static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
+ grpc_error *error;
grpc_call *call = bctl->call;
for (;;) {
size_t remaining = call->receiving_stream->length -
@@ -1198,11 +1199,22 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
finish_batch_step(exec_ctx, bctl);
return;
}
- if (grpc_byte_stream_next(exec_ctx, call->receiving_stream,
- &call->receiving_slice, remaining,
+ if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, remaining,
&call->receiving_slice_ready)) {
- grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
- call->receiving_slice);
+ error = grpc_byte_stream_pull(exec_ctx, call->receiving_stream,
+ &call->receiving_slice);
+ if (error == GRPC_ERROR_NONE) {
+ grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
+ call->receiving_slice);
+ } else {
+ grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
+ call->receiving_stream = NULL;
+ grpc_byte_buffer_destroy(*call->receiving_buffer);
+ *call->receiving_buffer = NULL;
+ call->receiving_message = 0;
+ finish_batch_step(exec_ctx, bctl);
+ return;
+ }
} else {
return;
}
@@ -1213,12 +1225,24 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
+ grpc_byte_stream *bs = call->receiving_stream;
+ bool release_error = false;
if (error == GRPC_ERROR_NONE) {
- grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
- call->receiving_slice);
- continue_receiving_slices(exec_ctx, bctl);
- } else {
+ grpc_slice slice;
+ error = grpc_byte_stream_pull(exec_ctx, bs, &slice);
+ if (error == GRPC_ERROR_NONE) {
+ grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
+ slice);
+ continue_receiving_slices(exec_ctx, bctl);
+ } else {
+ /* Error returned by grpc_byte_stream_pull needs to be released manually
+ */
+ release_error = true;
+ }
+ }
+
+ if (error != GRPC_ERROR_NONE) {
if (grpc_trace_operation_failures) {
GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
}
@@ -1226,7 +1250,11 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
call->receiving_stream = NULL;
grpc_byte_buffer_destroy(*call->receiving_buffer);
*call->receiving_buffer = NULL;
+ call->receiving_message = 0;
finish_batch_step(exec_ctx, bctl);
+ if (release_error) {
+ GRPC_ERROR_UNREF(error);
+ }
}
}
diff --git a/src/core/lib/surface/channel_init.c b/src/core/lib/surface/channel_init.c
index 7acb444d9b..20f5753004 100644
--- a/src/core/lib/surface/channel_init.c
+++ b/src/core/lib/surface/channel_init.c
@@ -104,30 +104,13 @@ void grpc_channel_init_shutdown(void) {
}
}
-static const char *name_for_type(grpc_channel_stack_type type) {
- switch (type) {
- case GRPC_CLIENT_CHANNEL:
- return "CLIENT_CHANNEL";
- case GRPC_CLIENT_SUBCHANNEL:
- return "CLIENT_SUBCHANNEL";
- case GRPC_SERVER_CHANNEL:
- return "SERVER_CHANNEL";
- case GRPC_CLIENT_LAME_CHANNEL:
- return "CLIENT_LAME_CHANNEL";
- case GRPC_CLIENT_DIRECT_CHANNEL:
- return "CLIENT_DIRECT_CHANNEL";
- case GRPC_NUM_CHANNEL_STACK_TYPES:
- break;
- }
- GPR_UNREACHABLE_CODE(return "UNKNOWN");
-}
-
bool grpc_channel_init_create_stack(grpc_exec_ctx *exec_ctx,
grpc_channel_stack_builder *builder,
grpc_channel_stack_type type) {
GPR_ASSERT(g_finalized);
- grpc_channel_stack_builder_set_name(builder, name_for_type(type));
+ grpc_channel_stack_builder_set_name(builder,
+ grpc_channel_stack_type_string(type));
for (size_t i = 0; i < g_slots[type].num_slots; i++) {
const stage_slot *slot = &g_slots[type].slots[i];
diff --git a/src/core/lib/surface/channel_stack_type.c b/src/core/lib/surface/channel_stack_type.c
index c35d603ca3..ed3b53fb36 100644
--- a/src/core/lib/surface/channel_stack_type.c
+++ b/src/core/lib/surface/channel_stack_type.c
@@ -52,3 +52,21 @@ bool grpc_channel_stack_type_is_client(grpc_channel_stack_type type) {
}
GPR_UNREACHABLE_CODE(return true;);
}
+
+const char *grpc_channel_stack_type_string(grpc_channel_stack_type type) {
+ switch (type) {
+ case GRPC_CLIENT_CHANNEL:
+ return "CLIENT_CHANNEL";
+ case GRPC_CLIENT_SUBCHANNEL:
+ return "CLIENT_SUBCHANNEL";
+ case GRPC_SERVER_CHANNEL:
+ return "SERVER_CHANNEL";
+ case GRPC_CLIENT_LAME_CHANNEL:
+ return "CLIENT_LAME_CHANNEL";
+ case GRPC_CLIENT_DIRECT_CHANNEL:
+ return "CLIENT_DIRECT_CHANNEL";
+ case GRPC_NUM_CHANNEL_STACK_TYPES:
+ break;
+ }
+ GPR_UNREACHABLE_CODE(return "UNKNOWN");
+}
diff --git a/src/core/lib/surface/channel_stack_type.h b/src/core/lib/surface/channel_stack_type.h
index 4eea4f1b01..ccf4e53d27 100644
--- a/src/core/lib/surface/channel_stack_type.h
+++ b/src/core/lib/surface/channel_stack_type.h
@@ -55,4 +55,6 @@ typedef enum {
bool grpc_channel_stack_type_is_client(grpc_channel_stack_type type);
+const char *grpc_channel_stack_type_string(grpc_channel_stack_type type);
+
#endif /* GRPC_CORE_LIB_SURFACE_CHANNEL_STACK_TYPE_H */
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 3273addf1d..35e9f7eb30 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -64,6 +64,10 @@ typedef struct {
struct grpc_completion_queue {
/** owned by pollset */
gpr_mu *mu;
+
+ grpc_cq_completion_type completion_type;
+ grpc_cq_polling_type polling_type;
+
/** completed events */
grpc_cq_completion completed_head;
grpc_cq_completion *completed_tail;
@@ -79,6 +83,7 @@ struct grpc_completion_queue {
int shutdown_called;
int is_server_cq;
/** Can the server cq accept incoming channels */
+ /* TODO: sreek - This will no longer be needed. Use polling_type set */
int is_non_listening_server_cq;
int num_pluckers;
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
@@ -110,13 +115,17 @@ int grpc_cq_event_timeout_trace;
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc,
grpc_error *error);
-grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
+grpc_completion_queue *grpc_completion_queue_create_internal(
+ grpc_cq_completion_type completion_type,
+ grpc_cq_polling_type polling_type) {
grpc_completion_queue *cc;
- GPR_ASSERT(!reserved);
- GPR_TIMER_BEGIN("grpc_completion_queue_create", 0);
+ GPR_TIMER_BEGIN("grpc_completion_queue_create_internal", 0);
- GRPC_API_TRACE("grpc_completion_queue_create(reserved=%p)", 1, (reserved));
+ GRPC_API_TRACE(
+ "grpc_completion_queue_create_internal(completion_type=%d, "
+ "polling_type=%d)",
+ 2, (completion_type, polling_type));
cc = gpr_zalloc(sizeof(grpc_completion_queue) + grpc_pollset_size());
grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu);
@@ -125,6 +134,9 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
cc->outstanding_tag_capacity = 0;
#endif
+ cc->completion_type = completion_type;
+ cc->polling_type = polling_type;
+
/* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->pending_events, 1);
/* One for destroy(), one for pollset_shutdown */
@@ -143,11 +155,19 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc,
grpc_schedule_on_exec_ctx);
- GPR_TIMER_END("grpc_completion_queue_create", 0);
+ GPR_TIMER_END("grpc_completion_queue_create_internal", 0);
return cc;
}
+grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) {
+ return cc->completion_type;
+}
+
+grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc) {
+ return cc->polling_type;
+}
+
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line) {
@@ -347,6 +367,13 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
grpc_event ret;
gpr_timespec now;
+ if (cc->completion_type != GRPC_CQ_NEXT) {
+ gpr_log(GPR_ERROR,
+ "grpc_completion_queue_next() cannot be called on this completion "
+ "queue since its completion type is not GRPC_CQ_NEXT");
+ abort();
+ }
+
GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
GRPC_API_TRACE(
@@ -516,6 +543,13 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
+ if (cc->completion_type != GRPC_CQ_PLUCK) {
+ gpr_log(GPR_ERROR,
+ "grpc_completion_queue_pluck() cannot be called on this completion "
+ "queue since its completion type is not GRPC_CQ_PLUCK");
+ abort();
+ }
+
if (grpc_cq_pluck_trace) {
GRPC_API_TRACE(
"grpc_completion_queue_pluck("
@@ -680,10 +714,14 @@ grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) {
}
void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc) {
+ /* TODO: sreek - use cc->polling_type field here and add a validation check
+ (i.e grpc_cq_mark_non_listening_server_cq can only be called on a cc whose
+ polling_type is set to GRPC_CQ_NON_LISTENING */
cc->is_non_listening_server_cq = 1;
}
bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) {
+ /* TODO (sreek) - return (cc->polling_type == GRPC_CQ_NON_LISTENING) */
return (cc->is_non_listening_server_cq == 1);
}
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index 5d73dd7216..1ff3d64293 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -99,4 +99,10 @@ bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc);
void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
int grpc_cq_is_server_cq(grpc_completion_queue *cc);
+grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc);
+grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc);
+
+grpc_completion_queue *grpc_completion_queue_create_internal(
+ grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type);
+
#endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */
diff --git a/src/core/lib/surface/completion_queue_factory.c b/src/core/lib/surface/completion_queue_factory.c
index db67a5192b..d68b84eddd 100644
--- a/src/core/lib/surface/completion_queue_factory.c
+++ b/src/core/lib/surface/completion_queue_factory.c
@@ -36,12 +36,15 @@
#include <grpc/support/log.h>
-/* TODO (sreek) - Currently this does not use the attributes arg. This will be
- added in a future PR */
+/*
+ * == Default completion queue factory implementation ==
+ */
+
static grpc_completion_queue* default_create(
const grpc_completion_queue_factory* factory,
- const grpc_completion_queue_attributes* attributes) {
- return grpc_completion_queue_create(NULL);
+ const grpc_completion_queue_attributes* attr) {
+ return grpc_completion_queue_create_internal(attr->cq_completion_type,
+ attr->cq_polling_type);
}
static grpc_completion_queue_factory_vtable default_vtable = {default_create};
@@ -49,19 +52,24 @@ static grpc_completion_queue_factory_vtable default_vtable = {default_create};
static const grpc_completion_queue_factory g_default_cq_factory = {
"Default Factory", NULL, &default_vtable};
+/*
+ * == Completion queue factory APIs
+ */
+
const grpc_completion_queue_factory* grpc_completion_queue_factory_lookup(
const grpc_completion_queue_attributes* attributes) {
- /* As we add more fields to grpc_completion_queue_attributes, we may have to
- change this assert to:
- GPR_ASSERT (attributes->version >= 1 &&
- attributes->version <= GRPC_CQ_CURRENT_VERSION) */
- GPR_ASSERT(attributes->version == 1);
+ GPR_ASSERT(attributes->version >= 1 &&
+ attributes->version <= GRPC_CQ_CURRENT_VERSION);
/* The default factory can handle version 1 of the attributes structure. We
may have to change this as more fields are added to the structure */
return &g_default_cq_factory;
}
+/*
+ * == Completion queue creation APIs ==
+ */
+
grpc_completion_queue* grpc_completion_queue_create_for_next(void* reserved) {
GPR_ASSERT(!reserved);
grpc_completion_queue_attributes attr = {1, GRPC_CQ_NEXT,
@@ -75,3 +83,10 @@ grpc_completion_queue* grpc_completion_queue_create_for_pluck(void* reserved) {
GRPC_CQ_DEFAULT_POLLING};
return g_default_cq_factory.vtable->create(&g_default_cq_factory, &attr);
}
+
+grpc_completion_queue* grpc_completion_queue_create(
+ const grpc_completion_queue_factory* factory,
+ const grpc_completion_queue_attributes* attr, void* reserved) {
+ GPR_ASSERT(!reserved);
+ return factory->vtable->create(factory, attr);
+}
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 91bd014a0e..4b381b1954 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -41,13 +41,8 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/channel/compress_filter.h"
#include "src/core/lib/channel/connected_channel.h"
-#include "src/core/lib/channel/deadline_filter.h"
#include "src/core/lib/channel/handshaker_registry.h"
-#include "src/core/lib/channel/http_client_filter.h"
-#include "src/core/lib/channel/http_server_filter.h"
-#include "src/core/lib/channel/message_size_filter.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/combiner.h"
@@ -95,57 +90,13 @@ static bool prepend_filter(grpc_exec_ctx *exec_ctx,
builder, (const grpc_channel_filter *)arg, NULL, NULL);
}
-static bool maybe_add_http_filter(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack_builder *builder,
- void *arg) {
- grpc_transport *t = grpc_channel_stack_builder_get_transport(builder);
- if (t && strstr(t->vtable->name, "http")) {
- return grpc_channel_stack_builder_prepend_filter(
- builder, (const grpc_channel_filter *)arg, NULL, NULL);
- }
- return true;
-}
-
static void register_builtin_channel_init() {
- grpc_channel_init_register_stage(
- GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- prepend_filter, (void *)&grpc_client_deadline_filter);
- grpc_channel_init_register_stage(
- GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
- (void *)&grpc_server_deadline_filter);
- grpc_channel_init_register_stage(
- GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- prepend_filter, (void *)&grpc_message_size_filter);
- grpc_channel_init_register_stage(
- GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- prepend_filter, (void *)&grpc_message_size_filter);
- grpc_channel_init_register_stage(
- GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
- (void *)&grpc_message_size_filter);
- grpc_channel_init_register_stage(
- GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
- (void *)&grpc_compress_filter);
- grpc_channel_init_register_stage(
- GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- prepend_filter, (void *)&grpc_compress_filter);
- grpc_channel_init_register_stage(
- GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
- (void *)&grpc_compress_filter);
- grpc_channel_init_register_stage(
- GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- maybe_add_http_filter, (void *)&grpc_http_client_filter);
grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
grpc_add_connected_filter, NULL);
- grpc_channel_init_register_stage(
- GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- maybe_add_http_filter, (void *)&grpc_http_client_filter);
grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
grpc_add_connected_filter, NULL);
- grpc_channel_init_register_stage(
- GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- maybe_add_http_filter, (void *)&grpc_http_server_filter);
grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
grpc_add_connected_filter, NULL);
@@ -189,7 +140,6 @@ void grpc_init(void) {
grpc_register_tracer("channel_stack_builder",
&grpc_trace_channel_stack_builder);
grpc_register_tracer("http1", &grpc_http1_trace);
- grpc_register_tracer("compression", &grpc_compression_trace);
grpc_register_tracer("queue_pluck", &grpc_cq_pluck_trace);
grpc_register_tracer("combiner", &grpc_combiner_trace);
grpc_register_tracer("server_channel", &grpc_server_channel_trace);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 191ee75252..38800ea37b 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -44,6 +44,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/support/stack_lockfree.h"
@@ -211,6 +212,11 @@ struct grpc_server {
gpr_mu mu_global; /* mutex for server and channel state */
gpr_mu mu_call; /* mutex for call-specific state */
+ /* startup synchronization: flag is protected by mu_global, signals whether
+ we are doing the listener start routine or not */
+ bool starting;
+ gpr_cv starting_cv;
+
registered_method *registered_methods;
/** one request matcher for unregistered methods */
request_matcher unregistered_request_matcher;
@@ -388,6 +394,7 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
grpc_channel_args_destroy(exec_ctx, server->channel_args);
gpr_mu_destroy(&server->mu_global);
gpr_mu_destroy(&server->mu_call);
+ gpr_cv_destroy(&server->starting_cv);
while ((rm = server->registered_methods) != NULL) {
server->registered_methods = rm->next;
if (server->started) {
@@ -1000,6 +1007,15 @@ void grpc_server_register_completion_queue(grpc_server *server,
GRPC_API_TRACE(
"grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
(server, cq, reserved));
+
+ if (grpc_get_cq_completion_type(cq) != GRPC_CQ_NEXT) {
+ gpr_log(GPR_INFO,
+ "Completion queue which is not of type GRPC_CQ_NEXT is being "
+ "registered as a server-completion-queue");
+ /* Ideally we should log an error and abort but ruby-wrapped-language API
+ calls grpc_completion_queue_pluck() on server completion queues */
+ }
+
register_completion_queue(server, cq, false, reserved);
}
@@ -1021,6 +1037,7 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
gpr_mu_init(&server->mu_global);
gpr_mu_init(&server->mu_call);
+ gpr_cv_init(&server->starting_cv);
/* decremented by grpc_server_destroy */
gpr_ref_init(&server->internal_refcount, 1);
@@ -1077,8 +1094,22 @@ void *grpc_server_register_method(
return m;
}
+static void start_listeners(grpc_exec_ctx *exec_ctx, void *s,
+ grpc_error *error) {
+ grpc_server *server = s;
+ for (listener *l = server->listeners; l; l = l->next) {
+ l->start(exec_ctx, server, l->arg, server->pollsets, server->pollset_count);
+ }
+
+ gpr_mu_lock(&server->mu_global);
+ server->starting = false;
+ gpr_cv_signal(&server->starting_cv);
+ gpr_mu_unlock(&server->mu_global);
+
+ server_unref(exec_ctx, server);
+}
+
void grpc_server_start(grpc_server *server) {
- listener *l;
size_t i;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -1112,10 +1143,11 @@ void grpc_server_start(grpc_server *server) {
(size_t)server->max_requested_calls_per_cq, server);
}
- for (l = server->listeners; l; l = l->next) {
- l->start(&exec_ctx, server, l->arg, server->pollsets,
- server->pollset_count);
- }
+ server_ref(server);
+ server->starting = true;
+ grpc_closure_sched(&exec_ctx, grpc_closure_create(start_listeners, server,
+ grpc_executor_scheduler),
+ GRPC_ERROR_NONE);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -1249,8 +1281,14 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,
(server, cq, tag));
- /* lock, and gather up some stuff to do */
+ /* wait for startup to be finished: locks mu_global */
gpr_mu_lock(&server->mu_global);
+ while (server->starting) {
+ gpr_cv_wait(&server->starting_cv, &server->mu_global,
+ gpr_inf_future(GPR_CLOCK_REALTIME));
+ }
+
+ /* stay locked, and gather up some stuff to do */
grpc_cq_begin_op(cq, tag);
if (server->shutdown_published) {
grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown,
diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c
index ba80bd801e..3793845559 100644
--- a/src/core/lib/surface/version.c
+++ b/src/core/lib/surface/version.c
@@ -36,6 +36,6 @@
#include <grpc/grpc.h>
-const char *grpc_version_string(void) { return "3.0.0-dev"; }
+const char *grpc_version_string(void) { return "4.0.0-dev"; }
const char *grpc_g_stands_for(void) { return "gentle"; }