aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Julien Boeuf <jboeuf@google.com>2015-06-19 13:58:52 +0200
committerGravatar Julien Boeuf <jboeuf@google.com>2015-06-19 13:58:52 +0200
commitafc8c77366708e9fe711b1d38a7f19e74ba6bc1b (patch)
tree8ca6555e5763fdd2b6e17fc54e047bc4a54c08ad /src/core
parent3e29de19a3768f277a76e1da2f26ed17c591cc37 (diff)
parenta15f08cc9a8004785ec937b2ee06951b600d87b6 (diff)
Merge branch 'master' of https://github.com/grpc/grpc into tsi_ssl_npn
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/channel_args.c24
-rw-r--r--src/core/channel/channel_args.h18
-rw-r--r--src/core/compression/algorithm.c18
-rw-r--r--src/core/iomgr/pollset_posix.c31
-rw-r--r--src/core/iomgr/tcp_windows.c2
-rw-r--r--src/core/support/sync.c4
-rw-r--r--src/core/surface/call.c41
-rw-r--r--src/core/surface/call.h1
-rw-r--r--src/core/surface/completion_queue.c38
-rw-r--r--src/core/surface/server.c201
-rw-r--r--src/core/transport/stream_op.h9
11 files changed, 269 insertions, 118 deletions
diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c
index 1b0e33b123..166d559a45 100644
--- a/src/core/channel/channel_args.c
+++ b/src/core/channel/channel_args.c
@@ -115,3 +115,27 @@ int grpc_channel_args_is_census_enabled(const grpc_channel_args *a) {
}
return 0;
}
+
+grpc_compression_level grpc_channel_args_get_compression_level(
+ const grpc_channel_args *a) {
+ size_t i;
+ if (a) {
+ for (i = 0; a && i < a->num_args; ++i) {
+ if (a->args[i].type == GRPC_ARG_INTEGER &&
+ !strcmp(GRPC_COMPRESSION_LEVEL_ARG, a->args[i].key)) {
+ return a->args[i].value.integer;
+ break;
+ }
+ }
+ }
+ return GRPC_COMPRESS_LEVEL_NONE;
+}
+
+void grpc_channel_args_set_compression_level(
+ grpc_channel_args **a, grpc_compression_level level) {
+ grpc_arg tmp;
+ tmp.type = GRPC_ARG_INTEGER;
+ tmp.key = GRPC_COMPRESSION_LEVEL_ARG;
+ tmp.value.integer = level;
+ *a = grpc_channel_args_copy_and_add(*a, &tmp);
+}
diff --git a/src/core/channel/channel_args.h b/src/core/channel/channel_args.h
index eb5bf63986..bf747b26e6 100644
--- a/src/core/channel/channel_args.h
+++ b/src/core/channel/channel_args.h
@@ -34,21 +34,31 @@
#ifndef GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H
#define GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H
+#include <grpc/compression.h>
#include <grpc/grpc.h>
/* Copy some arguments */
grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src);
-/* Copy some arguments and add the to_add parameter in the end.
+/** Copy some arguments and add the to_add parameter in the end.
If to_add is NULL, it is equivalent to call grpc_channel_args_copy. */
grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src,
const grpc_arg *to_add);
-/* Destroy arguments created by grpc_channel_args_copy */
+/** Destroy arguments created by grpc_channel_args_copy */
void grpc_channel_args_destroy(grpc_channel_args *a);
-/* Reads census_enabled settings from channel args. Returns 1 if census_enabled
- is specified in channel args, otherwise returns 0. */
+/** Reads census_enabled settings from channel args. Returns 1 if census_enabled
+ * is specified in channel args, otherwise returns 0. */
int grpc_channel_args_is_census_enabled(const grpc_channel_args *a);
+/** Returns the compression level set in \a a. */
+grpc_compression_level grpc_channel_args_get_compression_level(
+ const grpc_channel_args *a);
+
+/** Sets the compression level in \a a to \a level. Setting it to
+ * GRPC_COMPRESS_LEVEL_NONE disables compression for the channel. */
+void grpc_channel_args_set_compression_level(
+ grpc_channel_args **a, grpc_compression_level level);
+
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */
diff --git a/src/core/compression/algorithm.c b/src/core/compression/algorithm.c
index 36ead843d2..4db48df6cb 100644
--- a/src/core/compression/algorithm.c
+++ b/src/core/compression/algorithm.c
@@ -31,6 +31,7 @@
*
*/
+#include <stdlib.h>
#include <grpc/compression.h>
const char *grpc_compression_algorithm_name(
@@ -47,3 +48,20 @@ const char *grpc_compression_algorithm_name(
}
return "error";
}
+
+/* TODO(dgq): Add the ability to specify parameters to the individual
+ * compression algorithms */
+grpc_compression_algorithm grpc_compression_algorithm_for_level(
+ grpc_compression_level level) {
+ switch (level) {
+ case GRPC_COMPRESS_LEVEL_NONE:
+ return GRPC_COMPRESS_NONE;
+ case GRPC_COMPRESS_LEVEL_LOW:
+ case GRPC_COMPRESS_LEVEL_MED:
+ case GRPC_COMPRESS_LEVEL_HIGH:
+ return GRPC_COMPRESS_DEFLATE;
+ default:
+ /* we shouldn't be making it here */
+ abort();
+ }
+}
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index a8e6069002..d2f615271e 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -174,8 +174,6 @@ void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {
int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
/* pollset->mu already held */
gpr_timespec now = gpr_now();
- /* FIXME(ctiller): see below */
- gpr_timespec maximum_deadline = gpr_time_add(now, gpr_time_from_seconds(1));
int r;
if (gpr_time_cmp(now, deadline) > 0) {
return 0;
@@ -186,14 +184,25 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
return 1;
}
- /* FIXME(ctiller): we should not clamp deadline, however we have some
- stuck at shutdown bugs that this resolves */
- if (gpr_time_cmp(deadline, maximum_deadline) > 0) {
- deadline = maximum_deadline;
+ if (pollset->shutting_down) {
+ return 1;
}
gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
r = pollset->vtable->maybe_work(pollset, deadline, now, 1);
gpr_tls_set(&g_current_thread_poller, 0);
+ if (pollset->shutting_down) {
+ if (pollset->counter > 0) {
+ grpc_pollset_kick(pollset);
+ } else if (pollset->in_flight_cbs == 0) {
+ gpr_mu_unlock(&pollset->mu);
+ pollset->shutdown_done_cb(pollset->shutdown_done_arg);
+ /* Continuing to access pollset here is safe -- it is the caller's
+ * responsibility to not destroy when it has outstanding calls to
+ * grpc_pollset_work.
+ * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
+ gpr_mu_lock(&pollset->mu);
+ }
+ }
return r;
}
@@ -201,13 +210,19 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
void (*shutdown_done)(void *arg),
void *shutdown_done_arg) {
int in_flight_cbs;
+ int counter;
gpr_mu_lock(&pollset->mu);
pollset->shutting_down = 1;
in_flight_cbs = pollset->in_flight_cbs;
+ counter = pollset->counter;
pollset->shutdown_done_cb = shutdown_done;
pollset->shutdown_done_arg = shutdown_done_arg;
+ if (counter > 0) {
+ grpc_pollset_kick(pollset);
+ }
gpr_mu_unlock(&pollset->mu);
- if (in_flight_cbs == 0) {
+
+ if (in_flight_cbs == 0 && counter == 0) {
shutdown_done(shutdown_done_arg);
}
}
@@ -294,7 +309,7 @@ static void unary_poll_do_promote(void *args, int success) {
pollset->in_flight_cbs--;
if (pollset->shutting_down) {
/* We don't care about this pollset anymore. */
- if (pollset->in_flight_cbs == 0) {
+ if (pollset->in_flight_cbs == 0 && pollset->counter == 0) {
do_shutdown_cb = 1;
}
} else if (grpc_fd_is_orphaned(fd)) {
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index 12dac03080..15759c398a 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -154,7 +154,7 @@ static void on_read(void *tcpp, int from_iocp) {
status = GRPC_ENDPOINT_CB_ERROR;
} else {
if (info->bytes_transfered != 0) {
- sub = gpr_slice_sub(tcp->read_slice, 0, info->bytes_transfered);
+ sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered);
status = GRPC_ENDPOINT_CB_OK;
slice = &sub;
nslices = 1;
diff --git a/src/core/support/sync.c b/src/core/support/sync.c
index ccfe1e25f4..856b5adb86 100644
--- a/src/core/support/sync.c
+++ b/src/core/support/sync.c
@@ -118,7 +118,9 @@ void gpr_refn(gpr_refcount *r, int n) {
}
int gpr_unref(gpr_refcount *r) {
- return gpr_atm_full_fetch_add(&r->count, -1) == 1;
+ gpr_atm prior = gpr_atm_full_fetch_add(&r->count, -1);
+ GPR_ASSERT(prior > 0);
+ return prior == 1;
}
void gpr_stats_init(gpr_stats_counter *c, gpr_intptr n) {
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index cead5e08dc..5cdd7cd0f6 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -185,6 +185,7 @@ struct grpc_call {
and a strong upper bound of a count of masters to be calculated. */
gpr_uint8 request_set[GRPC_IOREQ_OP_COUNT];
grpc_ioreq_data request_data[GRPC_IOREQ_OP_COUNT];
+ gpr_uint32 request_flags[GRPC_IOREQ_OP_COUNT];
reqinfo_master masters[GRPC_IOREQ_OP_COUNT];
/* Dynamic array of ioreq's that have completed: the count of
@@ -228,6 +229,7 @@ struct grpc_call {
gpr_slice_buffer incoming_message;
gpr_uint32 incoming_message_length;
+ gpr_uint32 incoming_message_flags;
grpc_iomgr_closure destroy_closure;
};
@@ -670,6 +672,7 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) {
} else if (msg.length > 0) {
call->reading_message = 1;
call->incoming_message_length = msg.length;
+ call->incoming_message_flags = msg.flags;
return 1;
} else {
finish_message(call);
@@ -818,6 +821,7 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
grpc_ioreq_data data;
+ gpr_uint32 flags;
grpc_metadata_batch mdb;
size_t i;
GPR_ASSERT(op->send_ops == NULL);
@@ -844,8 +848,9 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
case WRITE_STATE_STARTED:
if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {
data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
+ flags = call->request_flags[GRPC_IOREQ_SEND_MESSAGE];
grpc_sopb_add_begin_message(
- &call->send_ops, grpc_byte_buffer_length(data.send_message), 0);
+ &call->send_ops, grpc_byte_buffer_length(data.send_message), flags);
copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops);
op->send_ops = &call->send_ops;
call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE;
@@ -979,6 +984,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
have_ops |= 1u << op;
call->request_data[op] = data;
+ call->request_flags[op] = reqs[i].flags;
call->request_set[op] = set;
}
@@ -1189,6 +1195,14 @@ static void finish_batch_with_close(grpc_call *call, int success, void *tag) {
grpc_cq_end_op(call->cq, tag, call, 1);
}
+static int are_write_flags_valid(gpr_uint32 flags) {
+ /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
+ const gpr_uint32 allowed_write_positions =
+ (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
+ const gpr_uint32 invalid_positions = ~allowed_write_positions;
+ return !(flags & invalid_positions);
+}
+
grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
size_t nops, void *tag) {
grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT];
@@ -1211,30 +1225,43 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
op = &ops[in];
switch (op->op) {
case GRPC_OP_SEND_INITIAL_METADATA:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_INITIAL_METADATA;
req->data.send_metadata.count = op->data.send_initial_metadata.count;
req->data.send_metadata.metadata =
op->data.send_initial_metadata.metadata;
+ req->flags = op->flags;
break;
case GRPC_OP_SEND_MESSAGE:
+ if (!are_write_flags_valid(op->flags)){
+ return GRPC_CALL_ERROR_INVALID_FLAGS;
+ }
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_MESSAGE;
req->data.send_message = op->data.send_message;
+ req->flags = ops->flags;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
if (!call->is_client) {
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_CLOSE;
+ req->flags = op->flags;
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
if (call->is_client) {
return GRPC_CALL_ERROR_NOT_ON_CLIENT;
}
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_TRAILING_METADATA;
+ req->flags = op->flags;
req->data.send_metadata.count =
op->data.send_status_from_server.trailing_metadata_count;
req->data.send_metadata.metadata =
@@ -1248,24 +1275,33 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
req->op = GRPC_IOREQ_SEND_CLOSE;
break;
case GRPC_OP_RECV_INITIAL_METADATA:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
if (!call->is_client) {
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
req->data.recv_metadata = op->data.recv_initial_metadata;
+ req->flags = op->flags;
break;
case GRPC_OP_RECV_MESSAGE:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_MESSAGE;
req->data.recv_message = op->data.recv_message;
+ req->flags = op->flags;
break;
case GRPC_OP_RECV_STATUS_ON_CLIENT:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
if (!call->is_client) {
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_STATUS;
+ req->flags = op->flags;
req->data.recv_status.set_value = set_status_value_directly;
req->data.recv_status.user_data = op->data.recv_status_on_client.status;
req = &reqs[out++];
@@ -1283,8 +1319,11 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
finish_func = finish_batch_with_close;
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_STATUS;
+ req->flags = op->flags;
req->data.recv_status.set_value = set_cancelled_value;
req->data.recv_status.user_data =
op->data.recv_close_on_server.cancelled;
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 9116538948..7a14161253 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -79,6 +79,7 @@ typedef union {
typedef struct {
grpc_ioreq_op op;
grpc_ioreq_data data;
+ gpr_uint32 flags; /**< A copy of the write flags from grpc_op */
} grpc_ioreq;
typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success,
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 8c9ca48a05..b48fbace31 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -83,7 +83,8 @@ grpc_completion_queue *grpc_completion_queue_create(void) {
memset(cc, 0, sizeof(*cc));
/* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->refs, 1);
- gpr_ref_init(&cc->owning_refs, 1);
+ /* One for destroy(), one for pollset_shutdown */
+ gpr_ref_init(&cc->owning_refs, 2);
grpc_pollset_init(&cc->pollset);
cc->allow_polling = 1;
return cc;
@@ -95,14 +96,14 @@ void grpc_cq_internal_ref(grpc_completion_queue *cc) {
static void on_pollset_destroy_done(void *arg) {
grpc_completion_queue *cc = arg;
- grpc_pollset_destroy(&cc->pollset);
- gpr_free(cc);
+ grpc_cq_internal_unref(cc);
}
void grpc_cq_internal_unref(grpc_completion_queue *cc) {
if (gpr_unref(&cc->owning_refs)) {
GPR_ASSERT(cc->queue == NULL);
- grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
+ grpc_pollset_destroy(&cc->pollset);
+ gpr_free(cc);
}
}
@@ -145,25 +146,25 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call) {
/* Signal the end of an operation - if this is the last waiting-to-be-queued
event, then enter shutdown mode */
-static void end_op_locked(grpc_completion_queue *cc,
- grpc_completion_type type) {
- if (gpr_unref(&cc->refs)) {
- GPR_ASSERT(!cc->shutdown);
- GPR_ASSERT(cc->shutdown_called);
- cc->shutdown = 1;
- gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
- }
-}
-
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
int success) {
event *ev;
+ int shutdown = 0;
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call);
ev->base.success = success;
- end_op_locked(cc, GRPC_OP_COMPLETE);
+ if (gpr_unref(&cc->refs)) {
+ GPR_ASSERT(!cc->shutdown);
+ GPR_ASSERT(cc->shutdown_called);
+ cc->shutdown = 1;
+ gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
+ shutdown = 1;
+ }
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
if (call) GRPC_CALL_INTERNAL_UNREF(call, "cq", 0);
+ if (shutdown) {
+ grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
+ }
}
/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */
@@ -179,6 +180,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
event *ev = NULL;
grpc_event ret;
+ grpc_cq_internal_ref(cc);
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
if (cc->queue != NULL) {
@@ -214,6 +216,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
+ grpc_cq_internal_unref(cc);
return ret;
}
}
@@ -221,6 +224,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
ret = ev->base;
gpr_free(ev);
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
+ grpc_cq_internal_unref(cc);
return ret;
}
@@ -258,6 +262,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
event *ev = NULL;
grpc_event ret;
+ grpc_cq_internal_ref(cc);
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
if ((ev = pluck_event(cc, tag))) {
@@ -276,6 +281,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
+ grpc_cq_internal_unref(cc);
return ret;
}
}
@@ -283,6 +289,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
ret = ev->base;
gpr_free(ev);
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
+ grpc_cq_internal_unref(cc);
return ret;
}
@@ -299,6 +306,7 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
cc->shutdown = 1;
gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
}
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 733f0e8a11..825ef66804 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -127,6 +127,11 @@ struct channel_data {
grpc_iomgr_closure finish_destroy_channel_closure;
};
+typedef struct shutdown_tag {
+ void *tag;
+ grpc_completion_queue *cq;
+} shutdown_tag;
+
struct grpc_server {
size_t channel_filter_count;
const grpc_channel_filter **channel_filters;
@@ -137,14 +142,14 @@ struct grpc_server {
size_t cq_count;
gpr_mu mu;
- gpr_cv cv;
registered_method *registered_methods;
requested_call_array requested_calls;
gpr_uint8 shutdown;
+ gpr_uint8 shutdown_published;
size_t num_shutdown_tags;
- void **shutdown_tags;
+ shutdown_tag *shutdown_tags;
call_data *lists[CALL_LIST_COUNT];
channel_data root_channel_data;
@@ -261,29 +266,32 @@ static void server_ref(grpc_server *server) {
gpr_ref(&server->internal_refcount);
}
-static void server_unref(grpc_server *server) {
+static void server_delete(grpc_server *server) {
registered_method *rm;
size_t i;
+ grpc_channel_args_destroy(server->channel_args);
+ gpr_mu_destroy(&server->mu);
+ gpr_free(server->channel_filters);
+ requested_call_array_destroy(&server->requested_calls);
+ while ((rm = server->registered_methods) != NULL) {
+ server->registered_methods = rm->next;
+ gpr_free(rm->method);
+ gpr_free(rm->host);
+ requested_call_array_destroy(&rm->requested);
+ gpr_free(rm);
+ }
+ for (i = 0; i < server->cq_count; i++) {
+ grpc_cq_internal_unref(server->cqs[i]);
+ }
+ gpr_free(server->cqs);
+ gpr_free(server->pollsets);
+ gpr_free(server->shutdown_tags);
+ gpr_free(server);
+}
+
+static void server_unref(grpc_server *server) {
if (gpr_unref(&server->internal_refcount)) {
- grpc_channel_args_destroy(server->channel_args);
- gpr_mu_destroy(&server->mu);
- gpr_cv_destroy(&server->cv);
- gpr_free(server->channel_filters);
- requested_call_array_destroy(&server->requested_calls);
- while ((rm = server->registered_methods) != NULL) {
- server->registered_methods = rm->next;
- gpr_free(rm->method);
- gpr_free(rm->host);
- requested_call_array_destroy(&rm->requested);
- gpr_free(rm);
- }
- for (i = 0; i < server->cq_count; i++) {
- grpc_cq_internal_unref(server->cqs[i]);
- }
- gpr_free(server->cqs);
- gpr_free(server->pollsets);
- gpr_free(server->shutdown_tags);
- gpr_free(server);
+ server_delete(server);
}
}
@@ -378,6 +386,26 @@ static void kill_zombie(void *elem, int success) {
grpc_call_destroy(grpc_call_from_top_element(elem));
}
+static int num_listeners(grpc_server *server) {
+ listener *l;
+ int n = 0;
+ for (l = server->listeners; l; l = l->next) {
+ n++;
+ }
+ return n;
+}
+
+static void maybe_finish_shutdown(grpc_server *server) {
+ size_t i;
+ if (server->shutdown && !server->shutdown_published && server->lists[ALL_CALLS] == NULL && server->listeners_destroyed == num_listeners(server)) {
+ server->shutdown_published = 1;
+ for (i = 0; i < server->num_shutdown_tags; i++) {
+ grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
+ NULL, 1);
+ }
+ }
+}
+
static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
channel_data *chand = elem->channel_data;
@@ -441,6 +469,9 @@ static void server_on_recv(void *ptr, int success) {
grpc_iomgr_add_callback(&calld->kill_zombie_closure);
}
+ if (call_list_remove(calld, ALL_CALLS)) {
+ maybe_finish_shutdown(chand->server);
+ }
gpr_mu_unlock(&chand->server->mu);
break;
}
@@ -539,19 +570,15 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- size_t i, j;
+ int removed[CALL_LIST_COUNT];
+ size_t i;
gpr_mu_lock(&chand->server->mu);
for (i = 0; i < CALL_LIST_COUNT; i++) {
- call_list_remove(elem->call_data, i);
+ removed[i] = call_list_remove(elem->call_data, i);
}
- if (chand->server->shutdown && chand->server->lists[ALL_CALLS] == NULL) {
- for (i = 0; i < chand->server->num_shutdown_tags; i++) {
- for (j = 0; j < chand->server->cq_count; j++) {
- grpc_cq_end_op(chand->server->cqs[j], chand->server->shutdown_tags[i],
- NULL, 1);
- }
- }
+ if (removed[ALL_CALLS]) {
+ maybe_finish_shutdown(chand->server);
}
gpr_mu_unlock(&chand->server->mu);
@@ -646,7 +673,6 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
memset(server, 0, sizeof(grpc_server));
gpr_mu_init(&server->mu);
- gpr_cv_init(&server->cv);
/* decremented by grpc_server_destroy */
gpr_ref_init(&server->internal_refcount, 1);
@@ -806,38 +832,28 @@ grpc_transport_setup_result grpc_server_setup_transport(
return result;
}
-static int num_listeners(grpc_server *server) {
- listener *l;
- int n = 0;
- for (l = server->listeners; l; l = l->next) {
- n++;
- }
- return n;
-}
-
-static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
- void *shutdown_tag) {
+void grpc_server_shutdown_and_notify(grpc_server *server,
+ grpc_completion_queue *cq, void *tag) {
listener *l;
requested_call_array requested_calls;
channel_data **channels;
channel_data *c;
size_t nchannels;
- size_t i, j;
+ size_t i;
grpc_channel_op op;
grpc_channel_element *elem;
registered_method *rm;
+ shutdown_tag *sdt;
/* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu);
- if (have_shutdown_tag) {
- for (i = 0; i < server->cq_count; i++) {
- grpc_cq_begin_op(server->cqs[i], NULL);
- }
- server->shutdown_tags =
- gpr_realloc(server->shutdown_tags,
- sizeof(void *) * (server->num_shutdown_tags + 1));
- server->shutdown_tags[server->num_shutdown_tags++] = shutdown_tag;
- }
+ grpc_cq_begin_op(cq, NULL);
+ server->shutdown_tags =
+ gpr_realloc(server->shutdown_tags,
+ sizeof(shutdown_tag) * (server->num_shutdown_tags + 1));
+ sdt = &server->shutdown_tags[server->num_shutdown_tags++];
+ sdt->tag = tag;
+ sdt->cq = cq;
if (server->shutdown) {
gpr_mu_unlock(&server->mu);
return;
@@ -878,13 +894,7 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
}
server->shutdown = 1;
- if (server->lists[ALL_CALLS] == NULL) {
- for (i = 0; i < server->num_shutdown_tags; i++) {
- for (j = 0; j < server->cq_count; j++) {
- grpc_cq_end_op(server->cqs[j], server->shutdown_tags[i], NULL, 1);
- }
- }
- }
+ maybe_finish_shutdown(server);
gpr_mu_unlock(&server->mu);
for (i = 0; i < nchannels; i++) {
@@ -914,46 +924,64 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
}
}
-void grpc_server_shutdown(grpc_server *server) {
- shutdown_internal(server, 0, NULL);
-}
-
-void grpc_server_shutdown_and_notify(grpc_server *server, void *tag) {
- shutdown_internal(server, 1, tag);
-}
-
void grpc_server_listener_destroy_done(void *s) {
grpc_server *server = s;
gpr_mu_lock(&server->mu);
server->listeners_destroyed++;
- gpr_cv_signal(&server->cv);
+ maybe_finish_shutdown(server);
gpr_mu_unlock(&server->mu);
}
-void grpc_server_destroy(grpc_server *server) {
- channel_data *c;
- listener *l;
- size_t i;
+void grpc_server_cancel_all_calls(grpc_server *server) {
call_data *calld;
+ grpc_call **calls;
+ size_t call_count;
+ size_t call_capacity;
+ int is_first = 1;
+ size_t i;
gpr_mu_lock(&server->mu);
- if (!server->shutdown) {
+
+ GPR_ASSERT(server->shutdown);
+
+ if (!server->lists[ALL_CALLS]) {
gpr_mu_unlock(&server->mu);
- grpc_server_shutdown(server);
- gpr_mu_lock(&server->mu);
+ return;
}
- while (server->listeners_destroyed != num_listeners(server)) {
- for (i = 0; i < server->cq_count; i++) {
- gpr_mu_unlock(&server->mu);
- grpc_cq_hack_spin_pollset(server->cqs[i]);
- gpr_mu_lock(&server->mu);
+ call_capacity = 8;
+ call_count = 0;
+ calls = gpr_malloc(sizeof(grpc_call *) * call_capacity);
+
+ for (calld = server->lists[ALL_CALLS]; calld != server->lists[ALL_CALLS] || is_first; calld = calld->links[ALL_CALLS].next) {
+ if (call_count == call_capacity) {
+ call_capacity *= 2;
+ calls = gpr_realloc(calls, sizeof(grpc_call *) * call_capacity);
}
+ calls[call_count++] = calld->call;
+ GRPC_CALL_INTERNAL_REF(calld->call, "cancel_all");
+ is_first = 0;
+ }
+
+ gpr_mu_unlock(&server->mu);
- gpr_cv_wait(&server->cv, &server->mu,
- gpr_time_add(gpr_now(), gpr_time_from_millis(100)));
+ for (i = 0; i < call_count; i++) {
+ grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE, "Unavailable");
+ GRPC_CALL_INTERNAL_UNREF(calls[i], "cancel_all", 1);
}
+ gpr_free(calls);
+}
+
+void grpc_server_destroy(grpc_server *server) {
+ channel_data *c;
+ listener *l;
+ call_data *calld;
+
+ gpr_mu_lock(&server->mu);
+ GPR_ASSERT(server->shutdown || !server->listeners);
+ GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
+
while (server->listeners) {
l = server->listeners;
server->listeners = l->next;
@@ -962,10 +990,6 @@ void grpc_server_destroy(grpc_server *server) {
while ((calld = call_list_remove_head(&server->lists[PENDING_START],
PENDING_START)) != NULL) {
- /* TODO(dgq): If we knew the size of the call list (or an upper bound), we
- * could allocate all the memory for the closures in advance in a single
- * chunk */
- gpr_log(GPR_DEBUG, "server destroys call %p", calld->call);
calld->state = ZOMBIED;
grpc_iomgr_closure_init(
&calld->kill_zombie_closure, kill_zombie,
@@ -1111,6 +1135,7 @@ static void begin_call(grpc_server *server, call_data *calld,
rc->data.batch.details->deadline = calld->deadline;
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = rc->data.batch.initial_metadata;
+ r->flags = 0;
r++;
publish = publish_registered_or_batch;
break;
@@ -1118,10 +1143,12 @@ static void begin_call(grpc_server *server, call_data *calld,
*rc->data.registered.deadline = calld->deadline;
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = rc->data.registered.initial_metadata;
+ r->flags = 0;
r++;
if (rc->data.registered.optional_payload) {
r->op = GRPC_IOREQ_RECV_MESSAGE;
r->data.recv_message = rc->data.registered.optional_payload;
+ r->flags = 0;
r++;
}
publish = publish_registered_or_batch;
diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h
index 5215cc87b1..e080701e2d 100644
--- a/src/core/transport/stream_op.h
+++ b/src/core/transport/stream_op.h
@@ -58,11 +58,18 @@ typedef enum grpc_stream_op_code {
GRPC_OP_SLICE
} grpc_stream_op_code;
+/** Internal bit flag for grpc_begin_message's \a flags signaling the use of
+ * compression for the message */
+#define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u)
+/** Mask of all valid internal flags. */
+#define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS)
+
/* Arguments for GRPC_OP_BEGIN_MESSAGE */
typedef struct grpc_begin_message {
/* How many bytes of data will this message contain */
gpr_uint32 length;
- /* Write flags for the message: see grpc.h GRPC_WRITE_xxx */
+ /* Write flags for the message: see grpc.h GRPC_WRITE_* for the public bits,
+ * GRPC_WRITE_INTERNAL_* for the internal ones. */
gpr_uint32 flags;
} grpc_begin_message;