diff options
Diffstat (limited to 'src/core')
84 files changed, 770 insertions, 452 deletions
diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c index 509ae0df89..166d559a45 100644 --- a/src/core/channel/channel_args.c +++ b/src/core/channel/channel_args.c @@ -36,6 +36,7 @@ #include "src/core/support/string.h" #include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> #include <string.h> @@ -114,3 +115,27 @@ int grpc_channel_args_is_census_enabled(const grpc_channel_args *a) { } return 0; } + +grpc_compression_level grpc_channel_args_get_compression_level( + const grpc_channel_args *a) { + size_t i; + if (a) { + for (i = 0; a && i < a->num_args; ++i) { + if (a->args[i].type == GRPC_ARG_INTEGER && + !strcmp(GRPC_COMPRESSION_LEVEL_ARG, a->args[i].key)) { + return a->args[i].value.integer; + break; + } + } + } + return GRPC_COMPRESS_LEVEL_NONE; +} + +void grpc_channel_args_set_compression_level( + grpc_channel_args **a, grpc_compression_level level) { + grpc_arg tmp; + tmp.type = GRPC_ARG_INTEGER; + tmp.key = GRPC_COMPRESSION_LEVEL_ARG; + tmp.value.integer = level; + *a = grpc_channel_args_copy_and_add(*a, &tmp); +} diff --git a/src/core/channel/channel_args.h b/src/core/channel/channel_args.h index eb5bf63986..bf747b26e6 100644 --- a/src/core/channel/channel_args.h +++ b/src/core/channel/channel_args.h @@ -34,21 +34,31 @@ #ifndef GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H #define GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H +#include <grpc/compression.h> #include <grpc/grpc.h> /* Copy some arguments */ grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src); -/* Copy some arguments and add the to_add parameter in the end. +/** Copy some arguments and add the to_add parameter in the end. If to_add is NULL, it is equivalent to call grpc_channel_args_copy. */ grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src, const grpc_arg *to_add); -/* Destroy arguments created by grpc_channel_args_copy */ +/** Destroy arguments created by grpc_channel_args_copy */ void grpc_channel_args_destroy(grpc_channel_args *a); -/* Reads census_enabled settings from channel args. Returns 1 if census_enabled - is specified in channel args, otherwise returns 0. */ +/** Reads census_enabled settings from channel args. Returns 1 if census_enabled + * is specified in channel args, otherwise returns 0. */ int grpc_channel_args_is_census_enabled(const grpc_channel_args *a); +/** Returns the compression level set in \a a. */ +grpc_compression_level grpc_channel_args_get_compression_level( + const grpc_channel_args *a); + +/** Sets the compression level in \a a to \a level. Setting it to + * GRPC_COMPRESS_LEVEL_NONE disables compression for the channel. */ +void grpc_channel_args_set_compression_level( + grpc_channel_args **a, grpc_compression_level level); + #endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */ diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c index cdc1c11681..6690265d75 100644 --- a/src/core/channel/child_channel.c +++ b/src/core/channel/child_channel.c @@ -157,17 +157,9 @@ static void lb_destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_child_channel_top_filter = { - lb_start_transport_op, - lb_channel_op, - - sizeof(lb_call_data), - lb_init_call_elem, - lb_destroy_call_elem, - - sizeof(lb_channel_data), - lb_init_channel_elem, - lb_destroy_channel_elem, - + lb_start_transport_op, lb_channel_op, + sizeof(lb_call_data), lb_init_call_elem, lb_destroy_call_elem, + sizeof(lb_channel_data), lb_init_channel_elem, lb_destroy_channel_elem, "child-channel", }; diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 31b1fc3bde..726196e996 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -102,10 +102,17 @@ struct call_data { static int prepare_activate(grpc_call_element *elem, grpc_child_channel *on_child) { call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; if (calld->state == CALL_CANCELLED) return 0; /* no more access to calld->s.waiting allowed */ GPR_ASSERT(calld->state == CALL_WAITING); + + if (calld->s.waiting_op.bind_pollset) { + grpc_transport_setup_del_interested_party(chand->transport_setup, + calld->s.waiting_op.bind_pollset); + } + calld->state = CALL_ACTIVE; /* create a child call */ @@ -199,6 +206,7 @@ static void cc_start_transport_op(grpc_call_element *elem, handle_op_after_cancellation(elem, op); } else { calld->state = CALL_WAITING; + calld->s.waiting_op.bind_pollset = NULL; if (chand->active_child) { /* channel is connected - use the connected stack */ if (prepare_activate(elem, chand->active_child)) { @@ -230,14 +238,14 @@ static void cc_start_transport_op(grpc_call_element *elem, } calld->s.waiting_op = *op; chand->waiting_children[chand->waiting_child_count++] = calld; + grpc_transport_setup_add_interested_party(chand->transport_setup, + op->bind_pollset); gpr_mu_unlock(&chand->mu); /* finally initiate transport setup if needed */ if (initiate_transport_setup) { grpc_transport_setup_initiate(chand->transport_setup); } - grpc_transport_setup_add_interested_party(chand->transport_setup, - op->bind_pollset); } } break; diff --git a/src/core/channel/client_setup.c b/src/core/channel/client_setup.c index f305d8ba9e..5be8fa66e9 100644 --- a/src/core/channel/client_setup.c +++ b/src/core/channel/client_setup.c @@ -56,12 +56,14 @@ struct grpc_client_setup { gpr_cv cv; grpc_client_setup_request *active_request; int refs; + /** The set of pollsets that are currently interested in this + connection being established */ + grpc_pollset_set interested_parties; }; struct grpc_client_setup_request { /* pointer back to the setup object */ grpc_client_setup *setup; - grpc_pollset_set interested_parties; gpr_timespec deadline; }; @@ -71,7 +73,7 @@ gpr_timespec grpc_client_setup_request_deadline(grpc_client_setup_request *r) { grpc_pollset_set *grpc_client_setup_get_interested_parties( grpc_client_setup_request *r) { - return &r->interested_parties; + return &r->setup->interested_parties; } static void destroy_setup(grpc_client_setup *s) { @@ -79,13 +81,11 @@ static void destroy_setup(grpc_client_setup *s) { gpr_cv_destroy(&s->cv); s->done(s->user_data); grpc_channel_args_destroy(s->args); + grpc_pollset_set_destroy(&s->interested_parties); gpr_free(s); } -static void destroy_request(grpc_client_setup_request *r) { - grpc_pollset_set_destroy(&r->interested_parties); - gpr_free(r); -} +static void destroy_request(grpc_client_setup_request *r) { gpr_free(r); } /* initiate handshaking */ static void setup_initiate(grpc_transport_setup *sp) { @@ -94,8 +94,6 @@ static void setup_initiate(grpc_transport_setup *sp) { int in_alarm = 0; r->setup = s; - grpc_pollset_set_init(&r->interested_parties); - /* TODO(klempner): Actually set a deadline */ r->deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(60)); gpr_mu_lock(&s->mu); @@ -120,33 +118,23 @@ static void setup_initiate(grpc_transport_setup *sp) { } } +/** implementation of add_interested_party for setup vtable */ static void setup_add_interested_party(grpc_transport_setup *sp, grpc_pollset *pollset) { grpc_client_setup *s = (grpc_client_setup *)sp; gpr_mu_lock(&s->mu); - if (!s->active_request) { - gpr_mu_unlock(&s->mu); - return; - } - - grpc_pollset_set_add_pollset(&s->active_request->interested_parties, pollset); - + grpc_pollset_set_add_pollset(&s->interested_parties, pollset); gpr_mu_unlock(&s->mu); } +/** implementation of del_interested_party for setup vtable */ static void setup_del_interested_party(grpc_transport_setup *sp, grpc_pollset *pollset) { grpc_client_setup *s = (grpc_client_setup *)sp; gpr_mu_lock(&s->mu); - if (!s->active_request) { - gpr_mu_unlock(&s->mu); - return; - } - - grpc_pollset_set_del_pollset(&s->active_request->interested_parties, pollset); - + grpc_pollset_set_del_pollset(&s->interested_parties, pollset); gpr_mu_unlock(&s->mu); } @@ -179,7 +167,8 @@ static void setup_cancel(grpc_transport_setup *sp) { } } -int grpc_client_setup_cb_begin(grpc_client_setup_request *r, const char *reason) { +int grpc_client_setup_cb_begin(grpc_client_setup_request *r, + const char *reason) { gpr_mu_lock(&r->setup->mu); if (r->setup->cancelled) { gpr_mu_unlock(&r->setup->mu); @@ -190,7 +179,8 @@ int grpc_client_setup_cb_begin(grpc_client_setup_request *r, const char *reason) return 1; } -void grpc_client_setup_cb_end(grpc_client_setup_request *r, const char *reason) { +void grpc_client_setup_cb_end(grpc_client_setup_request *r, + const char *reason) { gpr_mu_lock(&r->setup->mu); r->setup->in_cb--; if (r->setup->cancelled) gpr_cv_signal(&r->setup->cv); @@ -223,11 +213,13 @@ void grpc_client_setup_create_and_attach( s->in_alarm = 0; s->in_cb = 0; s->cancelled = 0; + grpc_pollset_set_init(&s->interested_parties); grpc_client_channel_set_transport_setup(newly_minted_channel, &s->base); } -int grpc_client_setup_request_should_continue(grpc_client_setup_request *r, const char *reason) { +int grpc_client_setup_request_should_continue(grpc_client_setup_request *r, + const char *reason) { int result; if (gpr_time_cmp(gpr_now(), r->deadline) > 0) { result = 0; @@ -239,7 +231,8 @@ int grpc_client_setup_request_should_continue(grpc_client_setup_request *r, cons return result; } -static void backoff_alarm_done(void *arg /* grpc_client_setup */, int success) { +static void backoff_alarm_done(void *arg /* grpc_client_setup_request */, + int success) { grpc_client_setup_request *r = arg; grpc_client_setup *s = r->setup; /* Handle status cancelled? */ diff --git a/src/core/channel/client_setup.h b/src/core/channel/client_setup.h index cbabb510b1..7d40338840 100644 --- a/src/core/channel/client_setup.h +++ b/src/core/channel/client_setup.h @@ -52,7 +52,8 @@ void grpc_client_setup_create_and_attach( /* Check that r is the active request: needs to be performed at each callback. If this races, we'll have two connection attempts running at once and the old one will get cleaned up in due course, which is fine. */ -int grpc_client_setup_request_should_continue(grpc_client_setup_request *r, const char *reason); +int grpc_client_setup_request_should_continue(grpc_client_setup_request *r, + const char *reason); void grpc_client_setup_request_finish(grpc_client_setup_request *r, int was_successful); const grpc_channel_args *grpc_client_setup_get_channel_args( @@ -61,7 +62,8 @@ const grpc_channel_args *grpc_client_setup_get_channel_args( /* Call before calling back into the setup listener, and call only if this function returns 1. If it returns 1, also promise to call grpc_client_setup_cb_end */ -int grpc_client_setup_cb_begin(grpc_client_setup_request *r, const char *reason); +int grpc_client_setup_cb_begin(grpc_client_setup_request *r, + const char *reason); void grpc_client_setup_cb_end(grpc_client_setup_request *r, const char *reason); /* Get the deadline for a request passed in to initiate. Implementations should diff --git a/src/core/compression/algorithm.c b/src/core/compression/algorithm.c index ca07002ff9..4db48df6cb 100644 --- a/src/core/compression/algorithm.c +++ b/src/core/compression/algorithm.c @@ -31,7 +31,8 @@ * */ -#include "src/core/compression/algorithm.h" +#include <stdlib.h> +#include <grpc/compression.h> const char *grpc_compression_algorithm_name( grpc_compression_algorithm algorithm) { @@ -47,3 +48,20 @@ const char *grpc_compression_algorithm_name( } return "error"; } + +/* TODO(dgq): Add the ability to specify parameters to the individual + * compression algorithms */ +grpc_compression_algorithm grpc_compression_algorithm_for_level( + grpc_compression_level level) { + switch (level) { + case GRPC_COMPRESS_LEVEL_NONE: + return GRPC_COMPRESS_NONE; + case GRPC_COMPRESS_LEVEL_LOW: + case GRPC_COMPRESS_LEVEL_MED: + case GRPC_COMPRESS_LEVEL_HIGH: + return GRPC_COMPRESS_DEFLATE; + default: + /* we shouldn't be making it here */ + abort(); + } +} diff --git a/src/core/compression/algorithm.h b/src/core/compression/algorithm.h deleted file mode 100644 index 9dd9f57b56..0000000000 --- a/src/core/compression/algorithm.h +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_INTERNAL_CORE_COMPRESSION_ALGORITHM_H -#define GRPC_INTERNAL_CORE_COMPRESSION_ALGORITHM_H - -/* The various compression algorithms supported by GRPC */ -typedef enum { - GRPC_COMPRESS_NONE = 0, - GRPC_COMPRESS_DEFLATE, - GRPC_COMPRESS_GZIP, - /* TODO(ctiller): snappy */ - GRPC_COMPRESS_ALGORITHMS_COUNT -} grpc_compression_algorithm; - -const char *grpc_compression_algorithm_name( - grpc_compression_algorithm algorithm); - -#endif /* GRPC_INTERNAL_CORE_COMPRESSION_ALGORITHM_H */ diff --git a/src/core/compression/message_compress.h b/src/core/compression/message_compress.h index e8aef1a713..b3eb8f579f 100644 --- a/src/core/compression/message_compress.h +++ b/src/core/compression/message_compress.h @@ -34,7 +34,7 @@ #ifndef GRPC_INTERNAL_CORE_COMPRESSION_MESSAGE_COMPRESS_H #define GRPC_INTERNAL_CORE_COMPRESSION_MESSAGE_COMPRESS_H -#include "src/core/compression/algorithm.h" +#include <grpc/compression.h> #include <grpc/support/slice_buffer.h> /* compress 'input' to 'output' using 'algorithm'. @@ -49,4 +49,4 @@ int grpc_msg_compress(grpc_compression_algorithm algorithm, int grpc_msg_decompress(grpc_compression_algorithm algorithm, gpr_slice_buffer *input, gpr_slice_buffer *output); -#endif /* GRPC_INTERNAL_CORE_COMPRESSION_MESSAGE_COMPRESS_H */ +#endif /* GRPC_INTERNAL_CORE_COMPRESSION_MESSAGE_COMPRESS_H */ diff --git a/src/core/debug/trace.c b/src/core/debug/trace.c index 32c35e7fb3..b53dfe804b 100644 --- a/src/core/debug/trace.c +++ b/src/core/debug/trace.c @@ -35,6 +35,7 @@ #include <string.h> +#include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include "src/core/support/env.h" @@ -80,27 +81,10 @@ static void parse(const char *s) { char **strings = NULL; size_t nstrings = 0; size_t i; - tracer *t; split(s, &strings, &nstrings); for (i = 0; i < nstrings; i++) { - const char *s = strings[i]; - if (0 == strcmp(s, "all")) { - for (t = tracers; t; t = t->next) { - *t->flag = 1; - } - } else { - int found = 0; - for (t = tracers; t; t = t->next) { - if (0 == strcmp(s, t->name)) { - *t->flag = 1; - found = 1; - } - } - if (!found) { - gpr_log(GPR_ERROR, "Unknown trace var: '%s'", s); - } - } + grpc_tracer_set_enabled(strings[i], 1); } for (i = 0; i < nstrings; i++) { @@ -115,9 +99,34 @@ void grpc_tracer_init(const char *env_var) { parse(e); gpr_free(e); } +} + +void grpc_tracer_shutdown(void) { while (tracers) { tracer *t = tracers; tracers = t->next; gpr_free(t); } } + +int grpc_tracer_set_enabled(const char *name, int enabled) { + tracer *t; + if (0 == strcmp(name, "all")) { + for (t = tracers; t; t = t->next) { + *t->flag = 1; + } + } else { + int found = 0; + for (t = tracers; t; t = t->next) { + if (0 == strcmp(name, t->name)) { + *t->flag = enabled; + found = 1; + } + } + if (!found) { + gpr_log(GPR_ERROR, "Unknown trace var: '%s'", name); + return 0; /* early return */ + } + } + return 1; +} diff --git a/src/core/debug/trace.h b/src/core/debug/trace.h index c02f14b7f2..fc8615bc69 100644 --- a/src/core/debug/trace.h +++ b/src/core/debug/trace.h @@ -38,5 +38,6 @@ void grpc_register_tracer(const char *name, int *flag); void grpc_tracer_init(const char *env_var_name); +void grpc_tracer_shutdown(void); #endif /* GRPC_INTERNAL_CORE_DEBUG_TRACE_H */ diff --git a/src/core/httpcli/format_request.c b/src/core/httpcli/format_request.c index af25219084..e875423e87 100644 --- a/src/core/httpcli/format_request.c +++ b/src/core/httpcli/format_request.c @@ -40,6 +40,7 @@ #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/slice.h> +#include <grpc/support/string_util.h> #include <grpc/support/useful.h> static void fill_common_header(const grpc_httpcli_request *request, gpr_strvec *buf) { diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index d39bcfe55d..914355a408 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -46,6 +46,7 @@ #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> typedef struct { gpr_slice request_text; @@ -61,6 +62,7 @@ typedef struct { void *user_data; grpc_httpcli_context *context; grpc_pollset *pollset; + grpc_iomgr_object iomgr_obj; } internal_request; static grpc_httpcli_get_override g_get_override = NULL; @@ -88,6 +90,7 @@ static void finish(internal_request *req, int success) { } gpr_slice_unref(req->request_text); gpr_free(req->host); + grpc_iomgr_unregister_object(&req->iomgr_obj); gpr_free(req); } @@ -229,6 +232,7 @@ void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset, gpr_timespec deadline, grpc_httpcli_response_cb on_response, void *user_data) { internal_request *req; + char *name; if (g_get_override && g_get_override(request, deadline, on_response, user_data)) { return; @@ -243,6 +247,9 @@ void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset, req->use_ssl = request->use_ssl; req->context = context; req->pollset = pollset; + gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path); + grpc_iomgr_register_object(&req->iomgr_obj, name); + gpr_free(name); if (req->use_ssl) { req->host = gpr_strdup(request->host); } @@ -258,6 +265,7 @@ void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset, gpr_timespec deadline, grpc_httpcli_response_cb on_response, void *user_data) { internal_request *req; + char *name; if (g_post_override && g_post_override(request, body_bytes, body_size, deadline, on_response, user_data)) { return; @@ -273,10 +281,14 @@ void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset, req->use_ssl = request->use_ssl; req->context = context; req->pollset = pollset; + gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path); + grpc_iomgr_register_object(&req->iomgr_obj, name); + gpr_free(name); if (req->use_ssl) { req->host = gpr_strdup(request->host); } + grpc_pollset_set_add_pollset(&req->context->pollset_set, req->pollset); grpc_resolve_address(request->host, req->use_ssl ? "https" : "http", on_resolved, req); } diff --git a/src/core/httpcli/httpcli.h b/src/core/httpcli/httpcli.h index cee374fe8e..06699e88c2 100644 --- a/src/core/httpcli/httpcli.h +++ b/src/core/httpcli/httpcli.h @@ -93,6 +93,10 @@ void grpc_httpcli_context_init(grpc_httpcli_context *context); void grpc_httpcli_context_destroy(grpc_httpcli_context *context); /* Asynchronously perform a HTTP GET. + 'context' specifies the http context under which to do the get + 'pollset' indicates a grpc_pollset that is interested in the result + of the get - work on this pollset may be used to progress the get + operation 'request' contains request parameters - these are caller owned and can be destroyed once the call returns 'deadline' contains a deadline for the request (or gpr_inf_future) @@ -106,7 +110,19 @@ void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset, grpc_httpcli_response_cb on_response, void *user_data); /* Asynchronously perform a HTTP POST. - When there is no body, pass in NULL as body_bytes. + 'context' specifies the http context under which to do the post + 'pollset' indicates a grpc_pollset that is interested in the result + of the post - work on this pollset may be used to progress the post + operation + 'request' contains request parameters - these are caller owned and can be + destroyed once the call returns + 'body_bytes' and 'body_size' specify the payload for the post. + When there is no body, pass in NULL as body_bytes. + 'deadline' contains a deadline for the request (or gpr_inf_future) + 'em' points to a caller owned event manager that must be alive for the + lifetime of the request + 'on_response' is a callback to report results to (and 'user_data' is a user + supplied pointer to pass to said call) Does not support ?var1=val1&var2=val2 in the path. */ void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset, const grpc_httpcli_request *request, diff --git a/src/core/httpcli/httpcli_security_connector.c b/src/core/httpcli/httpcli_security_connector.c index 6eed5eaf12..ce0d3d5a70 100644 --- a/src/core/httpcli/httpcli_security_connector.c +++ b/src/core/httpcli/httpcli_security_connector.c @@ -39,6 +39,7 @@ #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include "src/core/tsi/ssl_transport_security.h" typedef struct { diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c index 9b3b63f1e7..fa2d2555d6 100644 --- a/src/core/iomgr/endpoint_pair_posix.c +++ b/src/core/iomgr/endpoint_pair_posix.c @@ -47,6 +47,7 @@ #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> static void create_sockets(int sv[2]) { int flags; diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 2ac1866a66..347d8793c8 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -112,7 +112,8 @@ static void destroy(grpc_fd *fd) { #ifdef GRPC_FD_REF_COUNT_DEBUG #define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__) #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__) -static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) { +static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file, + int line) { gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst), gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line); @@ -125,7 +126,8 @@ static void ref_by(grpc_fd *fd, int n) { } #ifdef GRPC_FD_REF_COUNT_DEBUG -static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) { +static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file, + int line) { gpr_atm old; gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst), @@ -225,7 +227,7 @@ void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } #endif static void process_callback(grpc_iomgr_closure *closure, int success, - int allow_synchronous_callback) { + int allow_synchronous_callback) { if (allow_synchronous_callback) { closure->cb(closure->cb_arg, success); } else { @@ -265,7 +267,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure, GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY); gpr_atm_rel_store(st, NOT_READY); process_callback(closure, !gpr_atm_acq_load(&fd->shutdown), - allow_synchronous_callback); + allow_synchronous_callback); return; default: /* WAITING */ /* upcallptr was set to a different closure. This is an error! */ @@ -309,7 +311,7 @@ static void set_ready(grpc_fd *fd, gpr_atm *st, /* only one set_ready can be active at once (but there may be a racing notify_on) */ int success; - grpc_iomgr_closure* closure; + grpc_iomgr_closure *closure; size_t ncb = 0; gpr_mu_lock(&fd->set_state_mu); diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index 523d040d17..94d0019fa4 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -62,12 +62,12 @@ struct grpc_fd { gpr_atm shutdown; /* The watcher list. - + The following watcher related fields are protected by watcher_mu. - + An fd_watcher is an ephemeral object created when an fd wants to begin polling, and destroyed after the poll. - + It denotes the fd's interest in whether to read poll or write poll or both or neither on this fd. @@ -175,4 +175,4 @@ void grpc_fd_unref(grpc_fd *fd); void grpc_fd_global_init(void); void grpc_fd_global_shutdown(void); -#endif /* GRPC_INTERNAL_CORE_IOMGR_FD_POSIX_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_FD_POSIX_H */ diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index c6c44658df..c47528aa94 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -40,8 +40,9 @@ #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/thd.h> +#include <grpc/support/string_util.h> #include <grpc/support/sync.h> +#include <grpc/support/thd.h> static gpr_mu g_mu; static gpr_cv g_rcv; @@ -143,8 +144,8 @@ void grpc_iomgr_shutdown(void) { } if (g_root_object.next != &g_root_object) { int timeout = 0; - gpr_timespec short_deadline = gpr_time_add(gpr_now(), - gpr_time_from_millis(100)); + gpr_timespec short_deadline = + gpr_time_add(gpr_now(), gpr_time_from_millis(100)); while (gpr_cv_wait(&g_rcv, &g_mu, short_deadline) && g_cbs_head == NULL) { if (gpr_time_cmp(gpr_now(), shutdown_deadline) > 0) { timeout = 1; @@ -193,7 +194,6 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) { gpr_mu_unlock(&g_mu); } - void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb, void *cb_arg) { closure->cb = cb; @@ -217,12 +217,10 @@ void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *closure, int success) { gpr_mu_unlock(&g_mu); } - void grpc_iomgr_add_callback(grpc_iomgr_closure *closure) { grpc_iomgr_add_delayed_callback(closure, 1 /* GPR_TRUE */); } - int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) { int n = 0; gpr_mu *retake_mu = NULL; diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index 334e0ebde1..7472b6144f 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -58,7 +58,6 @@ void grpc_pollset_shutdown(grpc_pollset *pollset, void *shutdown_done_arg); void grpc_pollset_destroy(grpc_pollset *pollset); - /* Do some work on a pollset. May involve invoking asynchronous callbacks, or actually polling file descriptors. @@ -66,8 +65,8 @@ void grpc_pollset_destroy(grpc_pollset *pollset); May unlock GRPC_POLLSET_MU(pollset) during its execution. */ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline); -/* Break a pollset out of polling work +/* Break one polling thread out of polling work for this pollset. Requires GRPC_POLLSET_MU(pollset) locked. */ void grpc_pollset_kick(grpc_pollset *pollset); -#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */ diff --git a/src/core/iomgr/pollset_kick_posix.c b/src/core/iomgr/pollset_kick_posix.c index 21c9bb6542..51021784f2 100644 --- a/src/core/iomgr/pollset_kick_posix.c +++ b/src/core/iomgr/pollset_kick_posix.c @@ -73,7 +73,7 @@ static grpc_kick_fd_info *allocate_wfd(void) { return info; } -static void destroy_wfd(grpc_kick_fd_info* wfd) { +static void destroy_wfd(grpc_kick_fd_info *wfd) { grpc_wakeup_fd_destroy(&wfd->wakeup_fd); gpr_free(wfd); } @@ -104,7 +104,8 @@ void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state) { GPR_ASSERT(kick_state->fd_list.next == &kick_state->fd_list); } -grpc_kick_fd_info *grpc_pollset_kick_pre_poll(grpc_pollset_kick_state *kick_state) { +grpc_kick_fd_info *grpc_pollset_kick_pre_poll( + grpc_pollset_kick_state *kick_state) { grpc_kick_fd_info *fd_info; gpr_mu_lock(&kick_state->mu); if (kick_state->kicked) { @@ -120,11 +121,13 @@ grpc_kick_fd_info *grpc_pollset_kick_pre_poll(grpc_pollset_kick_state *kick_stat return fd_info; } -void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state, grpc_kick_fd_info *fd_info) { +void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state, + grpc_kick_fd_info *fd_info) { grpc_wakeup_fd_consume_wakeup(&fd_info->wakeup_fd); } -void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state, grpc_kick_fd_info *fd_info) { +void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state, + grpc_kick_fd_info *fd_info) { gpr_mu_lock(&kick_state->mu); fd_info->next->prev = fd_info->prev; fd_info->prev->next = fd_info->next; @@ -162,5 +165,4 @@ void grpc_pollset_kick_global_destroy(void) { gpr_mu_destroy(&fd_freelist_mu); } - -#endif /* GPR_POSIX_SOCKET */ +#endif /* GPR_POSIX_SOCKET */ diff --git a/src/core/iomgr/pollset_kick_posix.h b/src/core/iomgr/pollset_kick_posix.h index b35c2cfbe0..77e32a8d51 100644 --- a/src/core/iomgr/pollset_kick_posix.h +++ b/src/core/iomgr/pollset_kick_posix.h @@ -37,6 +37,11 @@ #include "src/core/iomgr/wakeup_fd_posix.h" #include <grpc/support/sync.h> +/* pollset kicking allows breaking a thread out of polling work for + a given pollset. + writing a byte to a pipe is used as a posix-ly portable base + mechanism, and eventfds are utilized on Linux for better performance. */ + typedef struct grpc_kick_fd_info { grpc_wakeup_fd_info wakeup_fd; /* used for polling list and free list */ @@ -51,7 +56,8 @@ typedef struct grpc_pollset_kick_state { struct grpc_kick_fd_info fd_list; } grpc_pollset_kick_state; -#define GRPC_POLLSET_KICK_GET_FD(kick_fd_info) GRPC_WAKEUP_FD_GET_READ_FD(&(kick_fd_info)->wakeup_fd) +#define GRPC_POLLSET_KICK_GET_FD(kick_fd_info) \ + GRPC_WAKEUP_FD_GET_READ_FD(&(kick_fd_info)->wakeup_fd) /* This is an abstraction around the typical pipe mechanism for waking up a thread sitting in a poll() style call. */ @@ -66,18 +72,22 @@ void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state); * applicable. Intended for testing. */ void grpc_pollset_kick_global_init_fallback_fd(void); -/* Must be called before entering poll(). If return value is -1, this consumed +/* Must be called before entering poll(). If return value is NULL, this consumed an existing kick. Otherwise the return value is an FD to add to the poll set. */ -grpc_kick_fd_info *grpc_pollset_kick_pre_poll(grpc_pollset_kick_state *kick_state); +grpc_kick_fd_info *grpc_pollset_kick_pre_poll( + grpc_pollset_kick_state *kick_state); /* Consume an existing kick. Must be called after poll returns that the fd was readable, and before calling kick_post_poll. */ -void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state, grpc_kick_fd_info *fd_info); +void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state, + grpc_kick_fd_info *fd_info); /* Must be called after pre_poll, and after consume if applicable */ -void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state, grpc_kick_fd_info *fd_info); +void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state, + grpc_kick_fd_info *fd_info); +/* Actually kick */ void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state); -#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_KICK_POSIX_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_KICK_POSIX_H */ diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 05b522bbf2..b4a526b9e7 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -97,14 +97,7 @@ static int multipoll_with_epoll_pollset_maybe_work( * here. */ - if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { - timeout_ms = -1; - } else { - timeout_ms = gpr_time_to_millis(gpr_time_sub(deadline, now)); - if (timeout_ms <= 0) { - return 1; - } - } + timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now); pollset->counter += 1; gpr_mu_unlock(&pollset->mu); @@ -143,7 +136,8 @@ static int multipoll_with_epoll_pollset_maybe_work( return 1; } -static void multipoll_with_epoll_pollset_finish_shutdown(grpc_pollset *pollset) {} +static void multipoll_with_epoll_pollset_finish_shutdown( + grpc_pollset *pollset) {} static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) { pollset_hdr *h = pollset->data.ptr; @@ -158,9 +152,12 @@ static void epoll_kick(grpc_pollset *pollset) { } static const grpc_pollset_vtable multipoll_with_epoll_pollset = { - multipoll_with_epoll_pollset_add_fd, multipoll_with_epoll_pollset_del_fd, - multipoll_with_epoll_pollset_maybe_work, epoll_kick, - multipoll_with_epoll_pollset_finish_shutdown, multipoll_with_epoll_pollset_destroy}; + multipoll_with_epoll_pollset_add_fd, + multipoll_with_epoll_pollset_del_fd, + multipoll_with_epoll_pollset_maybe_work, + epoll_kick, + multipoll_with_epoll_pollset_finish_shutdown, + multipoll_with_epoll_pollset_destroy}; static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, size_t nfds) { diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index afce5f65db..2f108da66a 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -113,14 +113,7 @@ static int multipoll_with_poll_pollset_maybe_work( grpc_kick_fd_info *kfd; h = pollset->data.ptr; - if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { - timeout = -1; - } else { - timeout = gpr_time_to_millis(gpr_time_sub(deadline, now)); - if (timeout <= 0) { - return 1; - } - } + timeout = grpc_poll_deadline_to_millis_timeout(deadline, now); if (h->pfd_capacity < h->fd_count + 1) { h->pfd_capacity = GPR_MAX(h->pfd_capacity * 3 / 2, h->fd_count + 1); gpr_free(h->pfds); @@ -231,9 +224,12 @@ static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) { } static const grpc_pollset_vtable multipoll_with_poll_pollset = { - multipoll_with_poll_pollset_add_fd, multipoll_with_poll_pollset_del_fd, - multipoll_with_poll_pollset_maybe_work, multipoll_with_poll_pollset_kick, - multipoll_with_poll_pollset_finish_shutdown, multipoll_with_poll_pollset_destroy}; + multipoll_with_poll_pollset_add_fd, + multipoll_with_poll_pollset_del_fd, + multipoll_with_poll_pollset_maybe_work, + multipoll_with_poll_pollset_kick, + multipoll_with_poll_pollset_finish_shutdown, + multipoll_with_poll_pollset_destroy}; void grpc_poll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, size_t nfds) { diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 796b1b9ebf..46d3d132ce 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -99,6 +99,7 @@ void grpc_pollset_init(grpc_pollset *pollset) { grpc_pollset_kick_init(&pollset->kick_state); pollset->in_flight_cbs = 0; pollset->shutting_down = 0; + pollset->called_shutdown = 0; become_basic_pollset(pollset, NULL); } @@ -141,7 +142,8 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { if (pollset->shutting_down) { if (pollset->counter > 0) { grpc_pollset_kick(pollset); - } else if (pollset->in_flight_cbs == 0) { + } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) { + pollset->called_shutdown = 1; gpr_mu_unlock(&pollset->mu); finish_shutdown(pollset); /* Continuing to access pollset here is safe -- it is the caller's @@ -157,21 +159,23 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { void grpc_pollset_shutdown(grpc_pollset *pollset, void (*shutdown_done)(void *arg), void *shutdown_done_arg) { - int in_flight_cbs; - int counter; + int call_shutdown = 0; gpr_mu_lock(&pollset->mu); GPR_ASSERT(!pollset->shutting_down); pollset->shutting_down = 1; - in_flight_cbs = pollset->in_flight_cbs; - counter = pollset->counter; + if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 && + pollset->counter == 0) { + pollset->called_shutdown = 1; + call_shutdown = 1; + } pollset->shutdown_done_cb = shutdown_done; pollset->shutdown_done_arg = shutdown_done_arg; - if (counter > 0) { + if (pollset->counter > 0) { grpc_pollset_kick(pollset); } gpr_mu_unlock(&pollset->mu); - if (in_flight_cbs == 0 && counter == 0) { + if (call_shutdown) { finish_shutdown(pollset); } } @@ -184,6 +188,22 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { gpr_mu_destroy(&pollset->mu); } +int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now) { + gpr_timespec timeout; + static const int max_spin_polling_us = 10; + if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { + return -1; + } + if (gpr_time_cmp( + deadline, + gpr_time_add(now, gpr_time_from_micros(max_spin_polling_us))) <= 0) { + return 0; + } + timeout = gpr_time_sub(deadline, now); + return gpr_time_to_millis( + gpr_time_add(timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1))); +} + /* * basic_pollset - a vtable that provides polling for zero or one file * descriptor via poll() @@ -340,14 +360,7 @@ static int basic_pollset_maybe_work(grpc_pollset *pollset, GRPC_FD_UNREF(fd, "basicpoll"); fd = pollset->data.ptr = NULL; } - if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { - timeout = -1; - } else { - timeout = gpr_time_to_millis(gpr_time_sub(deadline, now)); - if (timeout <= 0) { - return 1; - } - } + timeout = grpc_poll_deadline_to_millis_timeout(deadline, now); kfd = grpc_pollset_kick_pre_poll(&pollset->kick_state); if (kfd == NULL) { /* Already kicked */ @@ -417,7 +430,7 @@ static void basic_pollset_destroy(grpc_pollset *pollset) { } static const grpc_pollset_vtable basic_pollset = { - basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work, + basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work, kick_using_pollset_kick, basic_pollset_destroy, basic_pollset_destroy}; static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) { diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 2b897caa4b..ba3d638d41 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -56,6 +56,7 @@ typedef struct grpc_pollset { int counter; int in_flight_cbs; int shutting_down; + int called_shutdown; void (*shutdown_done_cb)(void *arg); void *shutdown_done_arg; union { @@ -93,6 +94,15 @@ int grpc_kick_read_fd(grpc_pollset *p); /* Call after polling has been kicked to leave the kicked state */ void grpc_kick_drain(grpc_pollset *p); +/* Convert a timespec to milliseconds: + - very small or negative poll times are clamped to zero to do a + non-blocking poll (which becomes spin polling) + - other small values are rounded up to one millisecond + - longer than a millisecond polls are rounded up to the next nearest + millisecond to avoid spinning + - infinite timeouts are converted to -1 */ +int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now); + /* turn a pollset into a multipoller: platform specific */ typedef void (*grpc_platform_become_multipoller_type)(grpc_pollset *pollset, struct grpc_fd **fds, diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h index 335ffb21b9..98e3b552a7 100644 --- a/src/core/iomgr/pollset_set.h +++ b/src/core/iomgr/pollset_set.h @@ -39,7 +39,7 @@ /* A grpc_pollset_set is a set of pollsets that are interested in an action. Adding a pollset to a pollset_set automatically adds any fd's (etc) that have been registered with the set_set with that pollset. - Registering fd's automatically iterates all current pollsets. */ + Registering fd's automatically adds them to all current pollsets. */ #ifdef GPR_POSIX_SOCKET #include "src/core/iomgr/pollset_set_posix.h" diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index c00d6e6e57..005e938398 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -49,7 +49,11 @@ void grpc_pollset_set_init(grpc_pollset_set *pollset_set) { } void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { + size_t i; gpr_mu_destroy(&pollset_set->mu); + for (i = 0; i < pollset_set->fd_count; i++) { + GRPC_FD_UNREF(pollset_set->fds[i], "pollset"); + } gpr_free(pollset_set->pollsets); gpr_free(pollset_set->fds); } @@ -95,6 +99,7 @@ void grpc_pollset_set_add_fd(grpc_pollset_set *pollset_set, grpc_fd *fd) { pollset_set->fds = gpr_realloc( pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds)); } + GRPC_FD_REF(fd, "pollset_set"); pollset_set->fds[pollset_set->fd_count++] = fd; for (i = 0; i < pollset_set->pollset_count; i++) { grpc_pollset_add_fd(pollset_set->pollsets[i], fd); @@ -110,6 +115,7 @@ void grpc_pollset_set_del_fd(grpc_pollset_set *pollset_set, grpc_fd *fd) { pollset_set->fd_count--; GPR_SWAP(grpc_fd *, pollset_set->fds[i], pollset_set->fds[pollset_set->pollset_count]); + GRPC_FD_UNREF(fd, "pollset_set"); break; } } diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c index d295c64b5e..b9c209cd2c 100644 --- a/src/core/iomgr/pollset_set_windows.c +++ b/src/core/iomgr/pollset_set_windows.c @@ -44,4 +44,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) {} void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set, grpc_pollset *pollset) {} +void grpc_pollset_set_del_pollset(grpc_pollset_set *pollset_set, + grpc_pollset *pollset) {} + #endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/pollset_set_windows.h b/src/core/iomgr/pollset_set_windows.h index 0f91f1ede7..cada0d2b61 100644 --- a/src/core/iomgr/pollset_set_windows.h +++ b/src/core/iomgr/pollset_set_windows.h @@ -34,8 +34,6 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H -typedef struct grpc_pollset_set { - void *unused; -} grpc_pollset_set; +typedef struct grpc_pollset_set { void *unused; } grpc_pollset_set; #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 88db774cc4..9deb0fa8fa 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -46,9 +46,7 @@ set of features for the sake of the rest of grpc. But grpc_pollset_work won't actually do any polling, and return as quickly as possible. */ -void grpc_pollset_init(grpc_pollset *pollset) { - gpr_mu_init(&pollset->mu); -} +void grpc_pollset_init(grpc_pollset *pollset) { gpr_mu_init(&pollset->mu); } void grpc_pollset_shutdown(grpc_pollset *pollset, void (*shutdown_done)(void *arg), @@ -75,6 +73,6 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { return 0 /* GPR_FALSE */; } -void grpc_pollset_kick(grpc_pollset *p) { } +void grpc_pollset_kick(grpc_pollset *p) {} -#endif /* GPR_WINSOCK_SOCKET */ +#endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h index acd82d0a0a..cbbd9efdd1 100644 --- a/src/core/iomgr/pollset_windows.h +++ b/src/core/iomgr/pollset_windows.h @@ -37,7 +37,6 @@ #include <windows.h> #include <grpc/support/sync.h> -#include "src/core/iomgr/pollset_kick.h" #include "src/core/iomgr/socket_windows.h" /* There isn't really any such thing as a pollset under Windows, due to the @@ -45,10 +44,8 @@ and a condition variable, as this is the minimal set of features we need implemented for the rest of grpc. But we won't use them directly. */ -typedef struct grpc_pollset { - gpr_mu mu; -} grpc_pollset; +typedef struct grpc_pollset { gpr_mu mu; } grpc_pollset; #define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) -#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c index fcf48fe0d7..20d8c58eb4 100644 --- a/src/core/iomgr/resolve_address_posix.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -47,6 +47,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> diff --git a/src/core/iomgr/resolve_address_windows.c b/src/core/iomgr/resolve_address_windows.c index 7d0d2f9e7a..fb5fd0d4f6 100644 --- a/src/core/iomgr/resolve_address_windows.c +++ b/src/core/iomgr/resolve_address_windows.c @@ -46,6 +46,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> @@ -134,9 +135,9 @@ static void do_request(void *rp) { grpc_resolve_cb cb = r->cb; gpr_free(r->name); gpr_free(r->default_port); + grpc_iomgr_unregister_object(&r->iomgr_object); gpr_free(r); cb(arg, resolved); - grpc_iomgr_unregister_object(&r->iomgr_object); } void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { diff --git a/src/core/iomgr/sockaddr_utils.c b/src/core/iomgr/sockaddr_utils.c index 3d202a5cc8..e91b94f8c8 100644 --- a/src/core/iomgr/sockaddr_utils.c +++ b/src/core/iomgr/sockaddr_utils.c @@ -40,6 +40,7 @@ #include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/port_platform.h> +#include <grpc/support/string_util.h> static const gpr_uint8 kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}; diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c index e4ba0a2b66..fbf3fdc949 100644 --- a/src/core/iomgr/socket_windows.c +++ b/src/core/iomgr/socket_windows.c @@ -85,13 +85,13 @@ int grpc_winsocket_shutdown(grpc_winsocket *socket) { both memory and sockets. */ void grpc_winsocket_orphan(grpc_winsocket *winsocket) { SOCKET socket = winsocket->socket; + grpc_iomgr_unregister_object(&winsocket->iomgr_object); if (winsocket->read_info.outstanding || winsocket->write_info.outstanding) { grpc_iocp_socket_orphan(winsocket); } else { grpc_winsocket_destroy(winsocket); } closesocket(socket); - grpc_iomgr_unregister_object(&winsocket->iomgr_object); } void grpc_winsocket_destroy(grpc_winsocket *winsocket) { diff --git a/src/core/iomgr/tcp_client.h b/src/core/iomgr/tcp_client.h index e1fdf235ec..0fa08b52b0 100644 --- a/src/core/iomgr/tcp_client.h +++ b/src/core/iomgr/tcp_client.h @@ -41,10 +41,12 @@ /* Asynchronously connect to an address (specified as (addr, len)), and call cb with arg and the completed connection when done (or call cb with arg and - NULL on failure) */ + NULL on failure). + interested_parties points to a set of pollsets that would be interested + in this connection being established (in order to continue their work) */ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), void *arg, grpc_pollset_set *interested_parties, const struct sockaddr *addr, int addr_len, gpr_timespec deadline); -#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_CLIENT_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_CLIENT_H */ diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index 2cd4aa2f45..bbf7711588 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -51,6 +51,7 @@ #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/time.h> typedef struct { @@ -222,8 +223,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), fdobj = grpc_fd_create(fd, name); if (err >= 0) { - cb(arg, - grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); + cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); goto done; } diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c index 88141a3dc2..b1a169b519 100644 --- a/src/core/iomgr/tcp_client_windows.c +++ b/src/core/iomgr/tcp_client_windows.c @@ -52,7 +52,7 @@ #include "src/core/iomgr/socket_windows.h" typedef struct { - void(*cb)(void *arg, grpc_endpoint *tcp); + void (*cb)(void *arg, grpc_endpoint *tcp); void *cb_arg; gpr_mu mu; grpc_winsocket *socket; @@ -86,7 +86,7 @@ static void on_connect(void *acp, int from_iocp) { SOCKET sock = ac->socket->socket; grpc_endpoint *ep = NULL; grpc_winsocket_callback_info *info = &ac->socket->write_info; - void(*cb)(void *arg, grpc_endpoint *tcp) = ac->cb; + void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb; void *cb_arg = ac->cb_arg; int aborted; @@ -99,8 +99,7 @@ static void on_connect(void *acp, int from_iocp) { DWORD transfered_bytes = 0; DWORD flags; BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, - &transfered_bytes, FALSE, - &flags); + &transfered_bytes, FALSE, &flags); info->outstanding = 0; GPR_ASSERT(transfered_bytes == 0); if (!wsa_success) { @@ -176,9 +175,9 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), /* Grab the function pointer for ConnectEx for that specific socket. It may change depending on the interface. */ - status = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, - &guid, sizeof(guid), &ConnectEx, sizeof(ConnectEx), - &ioctl_num_bytes, NULL, NULL); + status = + WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), + &ConnectEx, sizeof(ConnectEx), &ioctl_num_bytes, NULL, NULL); if (status != 0) { message = "Unable to retrieve ConnectEx pointer: %s"; @@ -187,8 +186,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), grpc_sockaddr_make_wildcard6(0, &local_address); - status = bind(sock, (struct sockaddr *) &local_address, - sizeof(local_address)); + status = bind(sock, (struct sockaddr *)&local_address, sizeof(local_address)); if (status != 0) { message = "Unable to bind socket: %s"; goto failure; @@ -234,4 +232,4 @@ failure: cb(arg, NULL); } -#endif /* GPR_WINSOCK_SOCKET */ +#endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index e3289f6806..9ad089af66 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -266,7 +266,7 @@ typedef struct { grpc_endpoint base; grpc_fd *em_fd; int fd; - int iov_size; /* Number of slices to allocate per read attempt */ + int iov_size; /* Number of slices to allocate per read attempt */ int finished_edge; size_t slice_size; gpr_refcount refcount; @@ -412,8 +412,7 @@ static void grpc_tcp_continue_read(grpc_tcp *tcp) { ++tcp->iov_size; } GPR_ASSERT(slice_state_has_available(&read_state)); - slice_state_transfer_ownership(&read_state, &final_slices, - &final_nslices); + slice_state_transfer_ownership(&read_state, &final_slices, &final_nslices); call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK); slice_state_destroy(&read_state); grpc_tcp_unref(tcp); diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 759a493795..5854031c9b 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -63,6 +63,7 @@ #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> @@ -107,6 +108,7 @@ struct grpc_tcp_server { /* destroyed port count: how many ports are completely destroyed */ size_t destroyed_ports; + /* is this server shutting down? (boolean) */ int shutdown; /* all listening ports */ @@ -118,7 +120,9 @@ struct grpc_tcp_server { void (*shutdown_complete)(void *); void *shutdown_complete_arg; + /* all pollsets interested in new connections */ grpc_pollset **pollsets; + /* number of pollsets in the pollsets array */ size_t pollset_count; }; @@ -159,6 +163,9 @@ static void destroyed_port(void *server, int success) { static void dont_care_about_shutdown_completion(void *ignored) {} +/* called when all listening endpoints have been shutdown, so no further + events will be received on them - at this point it's safe to destroy + things */ static void deactivated_all_ports(grpc_tcp_server *s) { size_t i; @@ -335,9 +342,8 @@ static void on_read(void *arg, int success) { for (i = 0; i < sp->server->pollset_count; i++) { grpc_pollset_add_fd(sp->server->pollsets[i], fdobj); } - sp->server->cb( - sp->server->cb_arg, - grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); + sp->server->cb(sp->server->cb_arg, + grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); gpr_free(name); gpr_free(addr_str); diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index 9ef369dfd8..d70968de88 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -41,6 +41,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/log_win32.h> +#include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index 3341f558a3..15759c398a 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -41,6 +41,7 @@ #include <grpc/support/log.h> #include <grpc/support/log_win32.h> #include <grpc/support/slice_buffer.h> +#include <grpc/support/string_util.h> #include <grpc/support/useful.h> #include "src/core/iomgr/alarm.h" @@ -153,7 +154,7 @@ static void on_read(void *tcpp, int from_iocp) { status = GRPC_ENDPOINT_CB_ERROR; } else { if (info->bytes_transfered != 0) { - sub = gpr_slice_sub(tcp->read_slice, 0, info->bytes_transfered); + sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered); status = GRPC_ENDPOINT_CB_OK; slice = ⊂ nslices = 1; diff --git a/src/core/json/json.h b/src/core/json/json.h index 69cbac17dc..b78b42a5b2 100644 --- a/src/core/json/json.h +++ b/src/core/json/json.h @@ -60,7 +60,7 @@ typedef struct grpc_json { * strings in the tree. The input stream's UTF-8 isn't validated, * as in, what you input is what you get as an output. * - * All the keys and values in the grpc_json_t objects will be strings + * All the keys and values in the grpc_json objects will be strings * pointing at your input buffer. * * Delete the allocated tree afterward using grpc_json_destroy(). diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c index 9d55362da6..e9bd45db68 100644 --- a/src/core/security/client_auth_filter.c +++ b/src/core/security/client_auth_filter.c @@ -37,6 +37,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include "src/core/support/string.h" #include "src/core/channel/channel_stack.h" @@ -52,6 +53,10 @@ typedef struct { grpc_credentials *creds; grpc_mdstr *host; grpc_mdstr *method; + /* pollset bound to this call; if we need to make external + network requests, they should be done under this pollset + so that work can progress when this call wants work to + progress */ grpc_pollset *pollset; grpc_transport_op op; size_t op_md_idx; @@ -302,13 +307,10 @@ static void init_channel_elem(grpc_channel_element *elem, chand->security_connector = (grpc_channel_security_connector *)grpc_security_connector_ref(sc); chand->md_ctx = metadata_context; - chand->authority_string = - grpc_mdstr_from_string(chand->md_ctx, ":authority"); + chand->authority_string = grpc_mdstr_from_string(chand->md_ctx, ":authority"); chand->path_string = grpc_mdstr_from_string(chand->md_ctx, ":path"); - chand->error_msg_key = - grpc_mdstr_from_string(chand->md_ctx, "grpc-message"); - chand->status_key = - grpc_mdstr_from_string(chand->md_ctx, "grpc-status"); + chand->error_msg_key = grpc_mdstr_from_string(chand->md_ctx, "grpc-message"); + chand->status_key = grpc_mdstr_from_string(chand->md_ctx, "grpc-status"); } /* Destructor for channel data */ @@ -332,6 +334,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_client_auth_filter = { - auth_start_transport_op, channel_op, sizeof(call_data), init_call_elem, - destroy_call_elem, sizeof(channel_data), init_channel_elem, - destroy_channel_elem, "client-auth"}; + auth_start_transport_op, channel_op, sizeof(call_data), + init_call_elem, destroy_call_elem, sizeof(channel_data), + init_channel_elem, destroy_channel_elem, "client-auth"}; diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index f3d0cf5452..cf663faf2d 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -46,6 +46,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> @@ -485,8 +486,8 @@ static int oauth2_token_fetcher_has_request_metadata_only( grpc_credentials_status grpc_oauth2_token_fetcher_credentials_parse_server_response( - const grpc_httpcli_response *response, - grpc_credentials_md_store **token_md, gpr_timespec *token_lifetime) { + const grpc_httpcli_response *response, grpc_credentials_md_store **token_md, + gpr_timespec *token_lifetime) { char *null_terminated_body = NULL; char *new_access_token = NULL; grpc_credentials_status status = GRPC_CREDENTIALS_OK; @@ -609,7 +610,8 @@ static void oauth2_token_fetcher_get_request_metadata( if (c->access_token_md != NULL && (gpr_time_cmp(gpr_time_sub(c->token_expiration, gpr_now()), refresh_threshold) > 0)) { - cached_access_token_md = grpc_credentials_md_store_ref(c->access_token_md); + cached_access_token_md = + grpc_credentials_md_store_ref(c->access_token_md); } gpr_mu_unlock(&c->mu); } @@ -639,8 +641,7 @@ static void init_oauth2_token_fetcher(grpc_oauth2_token_fetcher_credentials *c, /* -- ComputeEngine credentials. -- */ static grpc_credentials_vtable compute_engine_vtable = { - oauth2_token_fetcher_destroy, - oauth2_token_fetcher_has_request_metadata, + oauth2_token_fetcher_destroy, oauth2_token_fetcher_has_request_metadata, oauth2_token_fetcher_has_request_metadata_only, oauth2_token_fetcher_get_request_metadata, NULL}; @@ -685,8 +686,7 @@ static void service_account_destroy(grpc_credentials *creds) { } static grpc_credentials_vtable service_account_vtable = { - service_account_destroy, - oauth2_token_fetcher_has_request_metadata, + service_account_destroy, oauth2_token_fetcher_has_request_metadata, oauth2_token_fetcher_has_request_metadata_only, oauth2_token_fetcher_get_request_metadata, NULL}; @@ -759,8 +759,7 @@ static void refresh_token_destroy(grpc_credentials *creds) { } static grpc_credentials_vtable refresh_token_vtable = { - refresh_token_destroy, - oauth2_token_fetcher_has_request_metadata, + refresh_token_destroy, oauth2_token_fetcher_has_request_metadata, oauth2_token_fetcher_has_request_metadata_only, oauth2_token_fetcher_get_request_metadata, NULL}; @@ -899,8 +898,7 @@ static int fake_transport_security_has_request_metadata_only( return 0; } -static grpc_security_status -fake_transport_security_create_security_connector( +static grpc_security_status fake_transport_security_create_security_connector( grpc_credentials *c, const char *target, const grpc_channel_args *args, grpc_credentials *request_metadata_creds, grpc_channel_security_connector **sc, grpc_channel_args **new_args) { diff --git a/src/core/security/credentials.h b/src/core/security/credentials.h index 9c876d4226..75af73a0c6 100644 --- a/src/core/security/credentials.h +++ b/src/core/security/credentials.h @@ -108,7 +108,6 @@ grpc_credentials_md_store *grpc_credentials_md_store_ref( grpc_credentials_md_store *store); void grpc_credentials_md_store_unref(grpc_credentials_md_store *store); - /* --- grpc_credentials. --- */ /* It is the caller's responsibility to gpr_free the result if not NULL. */ @@ -177,8 +176,8 @@ grpc_credentials *grpc_credentials_contains_type( /* Exposed for testing only. */ grpc_credentials_status grpc_oauth2_token_fetcher_credentials_parse_server_response( - const struct grpc_httpcli_response *response, grpc_credentials_md_store **token_md, - gpr_timespec *token_lifetime); + const struct grpc_httpcli_response *response, + grpc_credentials_md_store **token_md, gpr_timespec *token_lifetime); /* Simulates an oauth2 token fetch with the specified value for testing. */ grpc_credentials *grpc_fake_oauth2_credentials_create( diff --git a/src/core/security/credentials_posix.c b/src/core/security/credentials_posix.c index 79622cb024..20f67a7f14 100644 --- a/src/core/security/credentials_posix.c +++ b/src/core/security/credentials_posix.c @@ -39,6 +39,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include "src/core/support/env.h" #include "src/core/support/string.h" diff --git a/src/core/security/credentials_win32.c b/src/core/security/credentials_win32.c index ddb310468b..92dfd9bdfe 100644 --- a/src/core/security/credentials_win32.c +++ b/src/core/security/credentials_win32.c @@ -39,6 +39,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include "src/core/support/env.h" #include "src/core/support/string.h" diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c index 5d40627ba4..5822ce6337 100644 --- a/src/core/security/google_default_credentials.c +++ b/src/core/security/google_default_credentials.c @@ -55,9 +55,7 @@ static int compute_engine_detection_done = 0; static gpr_mu g_mu; static gpr_once g_once = GPR_ONCE_INIT; -static void init_default_credentials(void) { - gpr_mu_init(&g_mu); -} +static void init_default_credentials(void) { gpr_mu_init(&g_mu); } typedef struct { grpc_pollset pollset; diff --git a/src/core/security/json_token.c b/src/core/security/json_token.c index eadae33609..6116f1d767 100644 --- a/src/core/security/json_token.c +++ b/src/core/security/json_token.c @@ -37,6 +37,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include "src/core/security/base64.h" #include "src/core/support/string.h" diff --git a/src/core/security/security_connector.c b/src/core/security/security_connector.c index 11505f8cb0..4098636a2e 100644 --- a/src/core/security/security_connector.c +++ b/src/core/security/security_connector.c @@ -47,6 +47,7 @@ #include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/slice_buffer.h> +#include <grpc/support/string_util.h> #include "src/core/tsi/fake_transport_security.h" #include "src/core/tsi/ssl_transport_security.h" diff --git a/src/core/security/security_context.c b/src/core/security/security_context.c index 14c194c8f6..9aba1e7f91 100644 --- a/src/core/security/security_context.c +++ b/src/core/security/security_context.c @@ -40,6 +40,7 @@ #include <grpc/grpc_security.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> /* --- grpc_call --- */ diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c index 1823f75808..b19160b8ed 100644 --- a/src/core/security/server_auth_filter.c +++ b/src/core/security/server_auth_filter.c @@ -78,7 +78,6 @@ static void init_call_elem(grpc_call_element *elem, calld->unused = 0; GPR_ASSERT(initial_op && initial_op->context != NULL && - chand->security_connector->auth_context != NULL && initial_op->context[GRPC_CONTEXT_SECURITY].value == NULL); /* Create a security context for the call and reference the auth context from diff --git a/src/core/support/cmdline.c b/src/core/support/cmdline.c index 530952c437..4baad85040 100644 --- a/src/core/support/cmdline.c +++ b/src/core/support/cmdline.c @@ -40,6 +40,7 @@ #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> typedef enum { ARGTYPE_INT, ARGTYPE_BOOL, ARGTYPE_STRING } argtype; diff --git a/src/core/support/env_linux.c b/src/core/support/env_linux.c index bdadfb6ca4..2e03365e33 100644 --- a/src/core/support/env_linux.c +++ b/src/core/support/env_linux.c @@ -45,6 +45,7 @@ #include <stdlib.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include "src/core/support/string.h" diff --git a/src/core/support/env_posix.c b/src/core/support/env_posix.c index 45f89b6737..1dd2af56bc 100644 --- a/src/core/support/env_posix.c +++ b/src/core/support/env_posix.c @@ -42,6 +42,7 @@ #include <grpc/support/log.h> #include "src/core/support/string.h" +#include <grpc/support/string_util.h> char *gpr_getenv(const char *name) { char *result = getenv(name); diff --git a/src/core/support/env_win32.c b/src/core/support/env_win32.c index 9b4cd698ad..6b1ff102b0 100644 --- a/src/core/support/env_win32.c +++ b/src/core/support/env_win32.c @@ -42,6 +42,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> char *gpr_getenv(const char *name) { size_t size; diff --git a/src/core/support/file.c b/src/core/support/file.c index 3a4ac6f2f0..8ce7a67fb1 100644 --- a/src/core/support/file.c +++ b/src/core/support/file.c @@ -38,6 +38,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include "src/core/support/string.h" diff --git a/src/core/support/file_posix.c b/src/core/support/file_posix.c index 11a459ad36..c11c07148a 100644 --- a/src/core/support/file_posix.c +++ b/src/core/support/file_posix.c @@ -44,6 +44,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include "src/core/support/string.h" diff --git a/src/core/support/file_win32.c b/src/core/support/file_win32.c index f59d3af397..355744f79a 100644 --- a/src/core/support/file_win32.c +++ b/src/core/support/file_win32.c @@ -42,6 +42,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include "src/core/support/file.h" #include "src/core/support/string_win32.h" diff --git a/src/core/support/host_port.c b/src/core/support/host_port.c index fa49f1a33a..53669f063b 100644 --- a/src/core/support/host_port.c +++ b/src/core/support/host_port.c @@ -38,6 +38,7 @@ #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> int gpr_join_host_port(char **out, const char *host, int port) { if (host[0] != '[' && strchr(host, ':') != NULL) { diff --git a/src/core/support/log_win32.c b/src/core/support/log_win32.c index 159c7e052c..d249be7d2e 100644 --- a/src/core/support/log_win32.c +++ b/src/core/support/log_win32.c @@ -42,6 +42,7 @@ #include <grpc/support/log_win32.h> #include <grpc/support/log.h> #include <grpc/support/time.h> +#include <grpc/support/string_util.h> #include "src/core/support/string.h" #include "src/core/support/string_win32.h" @@ -93,23 +94,22 @@ void gpr_default_log(gpr_log_func_args *args) { fprintf(stderr, "%s%s.%09u %5lu %s:%d] %s\n", gpr_log_severity_string(args->severity), time_buffer, - (int)(now.tv_nsec), GetCurrentThreadId(), - args->file, args->line, args->message); + (int)(now.tv_nsec), GetCurrentThreadId(), args->file, args->line, + args->message); } char *gpr_format_message(DWORD messageid) { LPTSTR tmessage; char *message; - DWORD status = FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | - FORMAT_MESSAGE_FROM_SYSTEM | - FORMAT_MESSAGE_IGNORE_INSERTS, - NULL, messageid, - MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), - (LPTSTR)(&tmessage), 0, NULL); - if (status == 0) return gpr_strdup("Unable to retreive error string"); + DWORD status = FormatMessage( + FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, messageid, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPTSTR)(&tmessage), 0, NULL); + if (status == 0) return gpr_strdup("Unable to retrieve error string"); message = gpr_tchar_to_char(tmessage); LocalFree(tmessage); return message; } -#endif /* GPR_WIN32 */ +#endif /* GPR_WIN32 */ diff --git a/src/core/support/string.h b/src/core/support/string.h index faf3342708..31e9fcb5e9 100644 --- a/src/core/support/string.h +++ b/src/core/support/string.h @@ -44,10 +44,6 @@ extern "C" { /* String utility functions */ -/* Returns a copy of src that can be passed to gpr_free(). - If allocation fails or if src is NULL, returns NULL. */ -char *gpr_strdup(const char *src); - /* flag to include plaintext after a hexdump */ #define GPR_HEXDUMP_PLAINTEXT 0x00000001 @@ -71,16 +67,6 @@ int gpr_ltoa(long value, char *output); /* Reverse a run of bytes */ void gpr_reverse_bytes(char *str, int len); -/* printf to a newly-allocated string. The set of supported formats may vary - between platforms. - - On success, returns the number of bytes printed (excluding the final '\0'), - and *strp points to a string which must later be destroyed with gpr_free(). - - On error, returns -1 and sets *strp to NULL. If the format string is bad, - the result is undefined. */ -int gpr_asprintf(char **strp, const char *format, ...); - /* Join a set of strings, returning the resulting string. Total combined length (excluding null terminator) is returned in total_length if it is non-null. */ diff --git a/src/core/surface/byte_buffer.c b/src/core/surface/byte_buffer.c index 12244f6644..4817e00454 100644 --- a/src/core/surface/byte_buffer.c +++ b/src/core/surface/byte_buffer.c @@ -35,25 +35,31 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -grpc_byte_buffer *grpc_byte_buffer_create(gpr_slice *slices, size_t nslices) { +grpc_byte_buffer *grpc_raw_byte_buffer_create(gpr_slice *slices, + size_t nslices) { + return grpc_raw_compressed_byte_buffer_create(slices, nslices, + GRPC_COMPRESS_NONE); +} + +grpc_byte_buffer *grpc_raw_compressed_byte_buffer_create( + gpr_slice *slices, size_t nslices, grpc_compression_algorithm compression) { size_t i; grpc_byte_buffer *bb = malloc(sizeof(grpc_byte_buffer)); - - bb->type = GRPC_BB_SLICE_BUFFER; - gpr_slice_buffer_init(&bb->data.slice_buffer); + bb->type = GRPC_BB_RAW; + bb->data.raw.compression = compression; + gpr_slice_buffer_init(&bb->data.raw.slice_buffer); for (i = 0; i < nslices; i++) { gpr_slice_ref(slices[i]); - gpr_slice_buffer_add(&bb->data.slice_buffer, slices[i]); + gpr_slice_buffer_add(&bb->data.raw.slice_buffer, slices[i]); } - return bb; } grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) { switch (bb->type) { - case GRPC_BB_SLICE_BUFFER: - return grpc_byte_buffer_create(bb->data.slice_buffer.slices, - bb->data.slice_buffer.count); + case GRPC_BB_RAW: + return grpc_raw_byte_buffer_create(bb->data.raw.slice_buffer.slices, + bb->data.raw.slice_buffer.count); } gpr_log(GPR_INFO, "should never get here"); abort(); @@ -63,8 +69,8 @@ grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) { void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) { if (!bb) return; switch (bb->type) { - case GRPC_BB_SLICE_BUFFER: - gpr_slice_buffer_destroy(&bb->data.slice_buffer); + case GRPC_BB_RAW: + gpr_slice_buffer_destroy(&bb->data.raw.slice_buffer); break; } free(bb); @@ -72,8 +78,8 @@ void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) { size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) { switch (bb->type) { - case GRPC_BB_SLICE_BUFFER: - return bb->data.slice_buffer.length; + case GRPC_BB_RAW: + return bb->data.raw.slice_buffer.length; } gpr_log(GPR_ERROR, "should never reach here"); abort(); diff --git a/src/core/surface/byte_buffer_reader.c b/src/core/surface/byte_buffer_reader.c index fd5289bac3..283db83833 100644 --- a/src/core/surface/byte_buffer_reader.c +++ b/src/core/surface/byte_buffer_reader.c @@ -33,42 +33,73 @@ #include <grpc/byte_buffer_reader.h> +#include <grpc/compression.h> #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/slice_buffer.h> #include <grpc/byte_buffer.h> -grpc_byte_buffer_reader *grpc_byte_buffer_reader_create( - grpc_byte_buffer *buffer) { - grpc_byte_buffer_reader *reader = malloc(sizeof(grpc_byte_buffer_reader)); - reader->buffer = buffer; +#include "src/core/compression/message_compress.h" + +static int is_compressed(grpc_byte_buffer *buffer) { switch (buffer->type) { - case GRPC_BB_SLICE_BUFFER: + case GRPC_BB_RAW: + if (buffer->data.raw.compression == GRPC_COMPRESS_NONE) { + return 0 /* GPR_FALSE */; + } + break; + } + return 1 /* GPR_TRUE */; +} + +void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader, + grpc_byte_buffer *buffer) { + gpr_slice_buffer decompressed_slices_buffer; + reader->buffer_in = buffer; + switch (reader->buffer_in->type) { + case GRPC_BB_RAW: + gpr_slice_buffer_init(&decompressed_slices_buffer); + if (is_compressed(reader->buffer_in)) { + grpc_msg_decompress(reader->buffer_in->data.raw.compression, + &reader->buffer_in->data.raw.slice_buffer, + &decompressed_slices_buffer); + reader->buffer_out = + grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices, + decompressed_slices_buffer.count); + gpr_slice_buffer_destroy(&decompressed_slices_buffer); + } else { /* not compressed, use the input buffer as output */ + reader->buffer_out = reader->buffer_in; + } reader->current.index = 0; + break; + } +} + +void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) { + switch (reader->buffer_in->type) { + case GRPC_BB_RAW: + /* keeping the same if-else structure as in the init function */ + if (is_compressed(reader->buffer_in)) { + grpc_byte_buffer_destroy(reader->buffer_out); + } + break; } - return reader; } int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader, gpr_slice *slice) { - grpc_byte_buffer *buffer = reader->buffer; - gpr_slice_buffer *slice_buffer; - switch (buffer->type) { - case GRPC_BB_SLICE_BUFFER: - slice_buffer = &buffer->data.slice_buffer; + switch (reader->buffer_in->type) { + case GRPC_BB_RAW: { + gpr_slice_buffer *slice_buffer; + slice_buffer = &reader->buffer_out->data.raw.slice_buffer; if (reader->current.index < slice_buffer->count) { *slice = gpr_slice_ref(slice_buffer->slices[reader->current.index]); reader->current.index += 1; return 1; - } else { - return 0; } break; + } } return 0; } - -void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) { - free(reader); -} diff --git a/src/core/surface/call.c b/src/core/surface/call.c index eea02211ae..9d8913f5b0 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -42,6 +42,7 @@ #include "src/core/surface/completion_queue.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <assert.h> #include <stdio.h> @@ -98,6 +99,8 @@ typedef enum { /* Status came from 'the wire' - or somewhere below the surface layer */ STATUS_FROM_WIRE, + /* Status came from the server sending status */ + STATUS_FROM_SERVER_STATUS, STATUS_SOURCE_COUNT } status_source; @@ -188,6 +191,7 @@ struct grpc_call { and a strong upper bound of a count of masters to be calculated. */ gpr_uint8 request_set[GRPC_IOREQ_OP_COUNT]; grpc_ioreq_data request_data[GRPC_IOREQ_OP_COUNT]; + gpr_uint32 request_flags[GRPC_IOREQ_OP_COUNT]; reqinfo_master masters[GRPC_IOREQ_OP_COUNT]; /* Dynamic array of ioreq's that have completed: the count of @@ -231,6 +235,7 @@ struct grpc_call { gpr_slice_buffer incoming_message; gpr_uint32 incoming_message_length; + gpr_uint32 incoming_message_flags; grpc_iomgr_closure destroy_closure; }; @@ -590,10 +595,18 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, call->write_state = WRITE_STATE_WRITE_CLOSED; } break; + case GRPC_IOREQ_SEND_STATUS: + if (call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details != + NULL) { + grpc_mdstr_unref( + call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details); + call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details = + NULL; + } + break; case GRPC_IOREQ_RECV_CLOSE: case GRPC_IOREQ_SEND_INITIAL_METADATA: case GRPC_IOREQ_SEND_TRAILING_METADATA: - case GRPC_IOREQ_SEND_STATUS: case GRPC_IOREQ_SEND_CLOSE: break; case GRPC_IOREQ_RECV_STATUS: @@ -677,7 +690,7 @@ static void call_on_done_send(void *pc, int success) { static void finish_message(grpc_call *call) { /* TODO(ctiller): this could be a lot faster if coded directly */ - grpc_byte_buffer *byte_buffer = grpc_byte_buffer_create( + grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create( call->incoming_message.slices, call->incoming_message.count); gpr_slice_buffer_reset_and_unref(&call->incoming_message); @@ -711,6 +724,7 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) { } else if (msg.length > 0) { call->reading_message = 1; call->incoming_message_length = msg.length; + call->incoming_message_flags = msg.flags; return 1; } else { finish_message(call); @@ -800,7 +814,7 @@ static void call_on_done_recv(void *pc, int success) { unlock(call); GRPC_CALL_INTERNAL_UNREF(call, "receiving", 0); - GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_ON_DONE_RECV, 0); + GRPC_TIMER_END(GRPC_PTAG_CALL_ON_DONE_RECV, 0); } static int prepare_application_metadata(grpc_call *call, size_t count, @@ -847,9 +861,9 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, size_t i; switch (byte_buffer->type) { - case GRPC_BB_SLICE_BUFFER: - for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) { - gpr_slice slice = byte_buffer->data.slice_buffer.slices[i]; + case GRPC_BB_RAW: + for (i = 0; i < byte_buffer->data.raw.slice_buffer.count; i++) { + gpr_slice slice = byte_buffer->data.raw.slice_buffer.slices[i]; gpr_slice_ref(slice); grpc_sopb_add_slice(sopb, slice); } @@ -859,9 +873,9 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { grpc_ioreq_data data; + gpr_uint32 flags; grpc_metadata_batch mdb; size_t i; - char status_str[GPR_LTOA_MIN_BUFSIZE]; GPR_ASSERT(op->send_ops == NULL); switch (call->write_state) { @@ -885,8 +899,9 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { case WRITE_STATE_STARTED: if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) { data = call->request_data[GRPC_IOREQ_SEND_MESSAGE]; + flags = call->request_flags[GRPC_IOREQ_SEND_MESSAGE]; grpc_sopb_add_begin_message( - &call->send_ops, grpc_byte_buffer_length(data.send_message), 0); + &call->send_ops, grpc_byte_buffer_length(data.send_message), flags); copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops); op->send_ops = &call->send_ops; call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE; @@ -905,13 +920,10 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { /* send status */ /* TODO(ctiller): cache common status values */ data = call->request_data[GRPC_IOREQ_SEND_STATUS]; - gpr_ltoa(data.send_status.code, status_str); grpc_metadata_batch_add_tail( &mdb, &call->status_link, - grpc_mdelem_from_metadata_strings( - call->metadata_context, - grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), - grpc_mdstr_from_string(call->metadata_context, status_str))); + grpc_channel_get_reffed_status_elem(call->channel, + data.send_status.code)); if (data.send_status.details) { grpc_metadata_batch_add_tail( &mdb, &call->details_link, @@ -919,8 +931,9 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { call->metadata_context, grpc_mdstr_ref( grpc_channel_get_message_string(call->channel)), - grpc_mdstr_from_string(call->metadata_context, - data.send_status.details))); + data.send_status.details)); + call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details = + NULL; } grpc_sopb_add_metadata(&call->send_ops, mdb); } @@ -1020,9 +1033,18 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, GRPC_CALL_ERROR_INVALID_METADATA); } } + if (op == GRPC_IOREQ_SEND_STATUS) { + set_status_code(call, STATUS_FROM_SERVER_STATUS, + reqs[i].data.send_status.code); + if (reqs[i].data.send_status.details) { + set_status_details(call, STATUS_FROM_SERVER_STATUS, + grpc_mdstr_ref(reqs[i].data.send_status.details)); + } + } have_ops |= 1u << op; call->request_data[op] = data; + call->request_flags[op] = reqs[i].flags; call->request_set[op] = set; } @@ -1239,6 +1261,14 @@ static void finish_batch_with_close(grpc_call *call, int success, void *tag) { grpc_cq_end_op(call->cq, tag, call, 1); } +static int are_write_flags_valid(gpr_uint32 flags) { + /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */ + const gpr_uint32 allowed_write_positions = + (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK); + const gpr_uint32 invalid_positions = ~allowed_write_positions; + return !(flags & invalid_positions); +} + grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t nops, void *tag) { grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT]; @@ -1261,30 +1291,43 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, op = &ops[in]; switch (op->op) { case GRPC_OP_SEND_INITIAL_METADATA: + /* Flag validation: currently allow no flags */ + if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_INITIAL_METADATA; req->data.send_metadata.count = op->data.send_initial_metadata.count; req->data.send_metadata.metadata = op->data.send_initial_metadata.metadata; + req->flags = op->flags; break; case GRPC_OP_SEND_MESSAGE: + if (!are_write_flags_valid(op->flags)) { + return GRPC_CALL_ERROR_INVALID_FLAGS; + } req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_MESSAGE; req->data.send_message = op->data.send_message; + req->flags = ops->flags; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + /* Flag validation: currently allow no flags */ + if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; if (!call->is_client) { return GRPC_CALL_ERROR_NOT_ON_SERVER; } req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_CLOSE; + req->flags = op->flags; break; case GRPC_OP_SEND_STATUS_FROM_SERVER: + /* Flag validation: currently allow no flags */ + if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; if (call->is_client) { return GRPC_CALL_ERROR_NOT_ON_CLIENT; } req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_TRAILING_METADATA; + req->flags = op->flags; req->data.send_metadata.count = op->data.send_status_from_server.trailing_metadata_count; req->data.send_metadata.metadata = @@ -1293,29 +1336,42 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, req->op = GRPC_IOREQ_SEND_STATUS; req->data.send_status.code = op->data.send_status_from_server.status; req->data.send_status.details = - op->data.send_status_from_server.status_details; + op->data.send_status_from_server.status_details != NULL + ? grpc_mdstr_from_string( + call->metadata_context, + op->data.send_status_from_server.status_details) + : NULL; req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_CLOSE; break; case GRPC_OP_RECV_INITIAL_METADATA: + /* Flag validation: currently allow no flags */ + if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; if (!call->is_client) { return GRPC_CALL_ERROR_NOT_ON_SERVER; } req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_INITIAL_METADATA; req->data.recv_metadata = op->data.recv_initial_metadata; + req->flags = op->flags; break; case GRPC_OP_RECV_MESSAGE: + /* Flag validation: currently allow no flags */ + if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_MESSAGE; req->data.recv_message = op->data.recv_message; + req->flags = op->flags; break; case GRPC_OP_RECV_STATUS_ON_CLIENT: + /* Flag validation: currently allow no flags */ + if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; if (!call->is_client) { return GRPC_CALL_ERROR_NOT_ON_SERVER; } req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_STATUS; + req->flags = op->flags; req->data.recv_status.set_value = set_status_value_directly; req->data.recv_status.user_data = op->data.recv_status_on_client.status; req = &reqs[out++]; @@ -1333,8 +1389,11 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, finish_func = finish_batch_with_close; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: + /* Flag validation: currently allow no flags */ + if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_STATUS; + req->flags = op->flags; req->data.recv_status.set_value = set_cancelled_value; req->data.recv_status.user_data = op->data.recv_close_on_server.cancelled; diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 17db8c2cdc..fb3662b50d 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -72,13 +72,14 @@ typedef union { grpc_byte_buffer *send_message; struct { grpc_status_code code; - const char *details; + grpc_mdstr *details; } send_status; } grpc_ioreq_data; typedef struct { grpc_ioreq_op op; grpc_ioreq_data data; + gpr_uint32 flags; /**< A copy of the write flags from grpc_op */ } grpc_ioreq; typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success, diff --git a/src/core/surface/call_log_batch.c b/src/core/surface/call_log_batch.c index 9905401bee..55663298c9 100644 --- a/src/core/surface/call_log_batch.c +++ b/src/core/surface/call_log_batch.c @@ -35,6 +35,7 @@ #include "src/core/support/string.h" #include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> int grpc_trace_batch = 0; diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 70485d71da..9ecdd374a9 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -37,12 +37,20 @@ #include <string.h> #include "src/core/iomgr/iomgr.h" +#include "src/core/support/string.h" #include "src/core/surface/call.h" #include "src/core/surface/client.h" #include "src/core/surface/init.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +/** Cache grpc-status: X mdelems for X = 0..NUM_CACHED_STATUS_ELEMS. + * Avoids needing to take a metadata context lock for sending status + * if the status code is <= NUM_CACHED_STATUS_ELEMS. + * Sized to allow the most commonly used codes to fit in + * (OK, Cancelled, Unknown). */ +#define NUM_CACHED_STATUS_ELEMS 3 + typedef struct registered_call { grpc_mdelem *path; grpc_mdelem *authority; @@ -54,10 +62,13 @@ struct grpc_channel { gpr_refcount refs; gpr_uint32 max_message_length; grpc_mdctx *metadata_context; + /** mdstr for the grpc-status key */ grpc_mdstr *grpc_status_string; grpc_mdstr *grpc_message_string; grpc_mdstr *path_string; grpc_mdstr *authority_string; + /** mdelem for grpc-status: 0 thru grpc-status: 2 */ + grpc_mdelem *grpc_status_elem[NUM_CACHED_STATUS_ELEMS]; gpr_mu registered_call_mu; registered_call *registered_calls; @@ -88,6 +99,13 @@ grpc_channel *grpc_channel_create_from_filters( channel->metadata_context = mdctx; channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message"); + for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { + char buf[GPR_LTOA_MIN_BUFSIZE]; + gpr_ltoa(i, buf); + channel->grpc_status_elem[i] = grpc_mdelem_from_metadata_strings( + mdctx, grpc_mdstr_ref(channel->grpc_status_string), + grpc_mdstr_from_string(mdctx, buf)); + } channel->path_string = grpc_mdstr_from_string(mdctx, ":path"); channel->authority_string = grpc_mdstr_from_string(mdctx, ":authority"); grpc_channel_stack_init(filters, num_filters, args, channel->metadata_context, @@ -181,7 +199,11 @@ void grpc_channel_internal_ref(grpc_channel *c) { static void destroy_channel(void *p, int ok) { grpc_channel *channel = p; + size_t i; grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel)); + for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { + grpc_mdelem_unref(channel->grpc_status_elem[i]); + } grpc_mdstr_unref(channel->grpc_status_string); grpc_mdstr_unref(channel->grpc_message_string); grpc_mdstr_unref(channel->path_string); @@ -200,7 +222,7 @@ static void destroy_channel(void *p, int ok) { #ifdef GRPC_CHANNEL_REF_COUNT_DEBUG void grpc_channel_internal_unref(grpc_channel *channel, const char *reason) { - gpr_log(GPR_DEBUG, "CHANNEL: unref %p %d -> %d [%s]", channel, + gpr_log(GPR_DEBUG, "CHANNEL: unref %p %d -> %d [%s]", channel, channel->refs.count, channel->refs.count - 1, reason); #else void grpc_channel_internal_unref(grpc_channel *channel) { @@ -247,6 +269,18 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) { return channel->grpc_status_string; } +grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { + if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) { + return grpc_mdelem_ref(channel->grpc_status_elem[i]); + } else { + char tmp[GPR_LTOA_MIN_BUFSIZE]; + gpr_ltoa(i, tmp); + return grpc_mdelem_from_metadata_strings( + channel->metadata_context, grpc_mdstr_ref(channel->grpc_status_string), + grpc_mdstr_from_string(channel->metadata_context, tmp)); + } +} + grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) { return channel->grpc_message_string; } diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index d1f62f2598..ba3c0abac9 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -40,8 +40,18 @@ grpc_channel *grpc_channel_create_from_filters( const grpc_channel_filter **filters, size_t count, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client); +/** Get a (borrowed) pointer to this channels underlying channel stack */ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel); + +/** Get a (borrowed) pointer to the channel wide metadata context */ grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel); + +/** Get a grpc_mdelem of grpc-status: X where X is the numeric value of + status_code. + + The returned elem is owned by the caller. */ +grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, + int status_code); grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index a49b754d9a..d069a04a9a 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -53,6 +53,7 @@ #include "src/core/transport/chttp2_transport.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/useful.h> @@ -137,7 +138,8 @@ static void on_resolved(void *rp, grpc_resolved_addresses *resolved) { request *r = rp; /* if we're not still the active request, abort */ - if (!grpc_client_setup_request_should_continue(r->cs_request, "on_resolved")) { + if (!grpc_client_setup_request_should_continue(r->cs_request, + "on_resolved")) { if (resolved) { grpc_resolved_addresses_destroy(resolved); } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 6808c976e1..bd0fabf9da 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -73,6 +73,7 @@ struct grpc_completion_queue { event *queue; /* Fixed size chained hash table of events for pluck() */ event *buckets[NUM_TAG_BUCKETS]; + int is_server_cq; }; grpc_completion_queue *grpc_completion_queue_create(void) { @@ -86,21 +87,10 @@ grpc_completion_queue *grpc_completion_queue_create(void) { return cc; } - - - - - - - - - - - - #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason) { - gpr_log(GPR_DEBUG, "CQ:%p ref %d -> %d %s", cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, reason); + gpr_log(GPR_DEBUG, "CQ:%p ref %d -> %d %s", cc, (int)cc->owning_refs.count, + (int)cc->owning_refs.count + 1, reason); #else void grpc_cq_internal_ref(grpc_completion_queue *cc) { #endif @@ -114,7 +104,8 @@ static void on_pollset_destroy_done(void *arg) { #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason) { - gpr_log(GPR_DEBUG, "CQ:%p unref %d -> %d %s", cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, reason); + gpr_log(GPR_DEBUG, "CQ:%p unref %d -> %d %s", cc, (int)cc->owning_refs.count, + (int)cc->owning_refs.count - 1, reason); #else void grpc_cq_internal_unref(grpc_completion_queue *cc) { #endif @@ -333,3 +324,7 @@ void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) { gpr_time_add(gpr_now(), gpr_time_from_millis(100))); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } + +void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } + +int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; } diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 2249d0e789..e76910c00b 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -63,4 +63,7 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc); +void grpc_cq_mark_server_cq(grpc_completion_queue *cc); +int grpc_cq_is_server_cq(grpc_completion_queue *cc); + #endif /* GRPC_INTERNAL_CORE_SURFACE_COMPLETION_QUEUE_H */ diff --git a/src/core/surface/event_string.c b/src/core/surface/event_string.c index 448bb1162b..33cd4a43aa 100644 --- a/src/core/surface/event_string.c +++ b/src/core/surface/event_string.c @@ -37,6 +37,7 @@ #include "src/core/support/string.h" #include <grpc/byte_buffer.h> +#include <grpc/support/string_util.h> static void addhdr(gpr_strvec *buf, grpc_event *ev) { char *tmp; diff --git a/src/core/surface/init.c b/src/core/surface/init.c index ac6871c6f2..ca61a38a35 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -78,6 +78,7 @@ void grpc_shutdown(void) { grpc_iomgr_shutdown(); census_shutdown(); grpc_timers_global_destroy(); + grpc_tracer_shutdown(); } gpr_mu_unlock(&g_init_mu); } diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 6c07b01544..b667128aef 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -118,9 +118,9 @@ static void init_channel_elem(grpc_channel_element *elem, static void destroy_channel_elem(grpc_channel_element *elem) {} static const grpc_channel_filter lame_filter = { - lame_start_transport_op, channel_op, sizeof(call_data), init_call_elem, - destroy_call_elem, sizeof(channel_data), init_channel_elem, - destroy_channel_elem, "lame-client", + lame_start_transport_op, channel_op, sizeof(call_data), + init_call_elem, destroy_call_elem, sizeof(channel_data), + init_channel_elem, destroy_channel_elem, "lame-client", }; grpc_channel *grpc_lame_client_channel_create(void) { diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 3875eb0614..fae3e4e90a 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -56,6 +56,7 @@ #include <grpc/grpc_security.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/useful.h> #include "src/core/tsi/transport_security_interface.h" @@ -96,7 +97,8 @@ static void on_secure_transport_setup_done(void *rp, if (status != GRPC_SECURITY_OK) { gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status); done(r, 0); - } else if (grpc_client_setup_cb_begin(r->cs_request, "on_secure_transport_setup_done")) { + } else if (grpc_client_setup_cb_begin(r->cs_request, + "on_secure_transport_setup_done")) { grpc_create_chttp2_transport( r->setup->setup_callback, r->setup->setup_user_data, grpc_client_setup_get_channel_args(r->cs_request), secure_endpoint, @@ -112,7 +114,8 @@ static void on_secure_transport_setup_done(void *rp, static void on_connect(void *rp, grpc_endpoint *tcp) { request *r = rp; - if (!grpc_client_setup_request_should_continue(r->cs_request, "on_connect.secure")) { + if (!grpc_client_setup_request_should_continue(r->cs_request, + "on_connect.secure")) { if (tcp) { grpc_endpoint_shutdown(tcp); grpc_endpoint_destroy(tcp); @@ -152,7 +155,8 @@ static void on_resolved(void *rp, grpc_resolved_addresses *resolved) { request *r = rp; /* if we're not still the active request, abort */ - if (!grpc_client_setup_request_should_continue(r->cs_request, "on_resolved.secure")) { + if (!grpc_client_setup_request_should_continue(r->cs_request, + "on_resolved.secure")) { if (resolved) { grpc_resolved_addresses_destroy(resolved); } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 4cf9213e66..c8ac559a0d 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -48,6 +48,7 @@ #include "src/core/transport/metadata.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/useful.h> typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list; @@ -494,7 +495,6 @@ static void server_on_recv(void *ptr, int success) { calld->state = ZOMBIED; grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_add_callback(&calld->kill_zombie_closure); - } if (call_list_remove(calld, ALL_CALLS)) { decrement_call_count(chand); @@ -590,7 +590,8 @@ static void finish_shutdown_channel(void *p, int success) { gpr_free(sca); } -static void shutdown_channel(channel_data *chand, int send_goaway, int send_disconnect) { +static void shutdown_channel(channel_data *chand, int send_goaway, + int send_disconnect) { shutdown_channel_args *sca; GRPC_CHANNEL_INTERNAL_REF(chand->channel, "shutdown"); sca = gpr_malloc(sizeof(shutdown_channel_args)); @@ -708,6 +709,7 @@ void grpc_server_register_completion_queue(grpc_server *server, if (server->cqs[i] == cq) return; } GRPC_CQ_INTERNAL_REF(cq, "server"); + grpc_cq_mark_server_cq(cq); n = server->cq_count++; server->cqs = gpr_realloc(server->cqs, server->cq_count * sizeof(grpc_completion_queue *)); @@ -1009,7 +1011,7 @@ void grpc_server_destroy(grpc_server *server) { listener *l; gpr_mu_lock(&server->mu); - GPR_ASSERT(server->shutdown); + GPR_ASSERT(server->shutdown || !server->listeners); GPR_ASSERT(server->listeners_destroyed == num_listeners(server)); while (server->listeners) { @@ -1080,6 +1082,9 @@ grpc_call_error grpc_server_request_call( GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details, initial_metadata, cq_bound_to_call, cq_for_notification, tag); + if (!grpc_cq_is_server_cq(cq_for_notification)) { + return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; + } grpc_cq_begin_op(cq_for_notification, NULL); rc.type = BATCH_CALL; rc.tag = tag; @@ -1098,6 +1103,9 @@ grpc_call_error grpc_server_request_registered_call( grpc_completion_queue *cq_for_notification, void *tag) { requested_call rc; registered_method *registered_method = rm; + if (!grpc_cq_is_server_cq(cq_for_notification)) { + return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; + } grpc_cq_begin_op(cq_for_notification, NULL); rc.type = REGISTERED_CALL; rc.tag = tag; @@ -1152,6 +1160,7 @@ static void begin_call(grpc_server *server, call_data *calld, rc->data.batch.details->deadline = calld->deadline; r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->data.recv_metadata = rc->data.batch.initial_metadata; + r->flags = 0; r++; publish = publish_registered_or_batch; break; @@ -1159,10 +1168,12 @@ static void begin_call(grpc_server *server, call_data *calld, *rc->data.registered.deadline = calld->deadline; r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->data.recv_metadata = rc->data.registered.initial_metadata; + r->flags = 0; r++; if (rc->data.registered.optional_payload) { r->op = GRPC_IOREQ_RECV_MESSAGE; r->data.recv_message = rc->data.registered.optional_payload; + r->flags = 0; r++; } publish = publish_registered_or_batch; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index c85eb96a6f..e10bb6a13e 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -230,7 +230,10 @@ struct transport { /* basic state management - what are we doing at the moment? */ gpr_uint8 reading; gpr_uint8 writing; - gpr_uint8 calling_back; + /** are we calling back (via cb) with a channel-level event */ + gpr_uint8 calling_back_channel; + /** are we calling back any grpc_transport_op completion events */ + gpr_uint8 calling_back_ops; gpr_uint8 destroying; gpr_uint8 closed; error_state error_state; @@ -369,7 +372,7 @@ static void push_setting(transport *t, grpc_chttp2_setting_id id, gpr_uint32 value); static int prepare_callbacks(transport *t); -static void run_callbacks(transport *t, const grpc_transport_callbacks *cb); +static void run_callbacks(transport *t); static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb); static int prepare_write(transport *t); @@ -577,7 +580,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, } gpr_mu_lock(&t->mu); - t->calling_back = 1; + t->calling_back_channel = 1; ref_transport(t); /* matches unref at end of this function */ gpr_mu_unlock(&t->mu); @@ -586,7 +589,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, lock(t); t->cb = sr.callbacks; t->cb_user_data = sr.user_data; - t->calling_back = 0; + t->calling_back_channel = 0; if (t->destroying) gpr_cv_signal(&t->cv); unlock(t); @@ -607,7 +610,7 @@ static void destroy_transport(grpc_transport *gt) { We need to be not writing as cancellation finalization may produce some callbacks that NEED to be made to close out some streams when t->writing becomes 0. */ - while (t->calling_back || t->writing) { + while (t->calling_back_channel || t->writing) { gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future); } drop_connection(t); @@ -847,28 +850,29 @@ static void unlock(transport *t) { finish_reads(t); /* gather any callbacks that need to be made */ - if (!t->calling_back) { - t->calling_back = perform_callbacks = prepare_callbacks(t); - if (cb) { - if (t->error_state == ERROR_STATE_SEEN && !t->writing) { - call_closed = 1; - t->calling_back = 1; - t->cb = NULL; /* no more callbacks */ - t->error_state = ERROR_STATE_NOTIFIED; - } - if (t->num_pending_goaways) { - goaways = t->pending_goaways; - num_goaways = t->num_pending_goaways; - t->pending_goaways = NULL; - t->num_pending_goaways = 0; - t->cap_pending_goaways = 0; - t->calling_back = 1; - } - } + if (!t->calling_back_ops) { + t->calling_back_ops = perform_callbacks = prepare_callbacks(t); + if (perform_callbacks) ref_transport(t); } - if (perform_callbacks || call_closed || num_goaways) { - ref_transport(t); + if (!t->calling_back_channel && cb) { + if (t->error_state == ERROR_STATE_SEEN && !t->writing) { + call_closed = 1; + t->calling_back_channel = 1; + t->cb = NULL; /* no more callbacks */ + t->error_state = ERROR_STATE_NOTIFIED; + } + if (t->num_pending_goaways) { + goaways = t->pending_goaways; + num_goaways = t->num_pending_goaways; + t->pending_goaways = NULL; + t->num_pending_goaways = 0; + t->cap_pending_goaways = 0; + t->calling_back_channel = 1; + } + if (call_closed || num_goaways) { + ref_transport(t); + } } /* finally unlock */ @@ -882,7 +886,11 @@ static void unlock(transport *t) { } if (perform_callbacks) { - run_callbacks(t, cb); + run_callbacks(t); + lock(t); + t->calling_back_ops = 0; + unlock(t); + unref_transport(t); } if (call_closed) { @@ -895,9 +903,9 @@ static void unlock(transport *t) { perform_write(t, ep); } - if (perform_callbacks || call_closed || num_goaways) { + if (call_closed || num_goaways) { lock(t); - t->calling_back = 0; + t->calling_back_channel = 0; if (t->destroying) gpr_cv_signal(&t->cv); unlock(t); unref_transport(t); @@ -1009,10 +1017,12 @@ static void finalize_outbuf(transport *t) { while ((s = stream_list_remove_head(t, WRITING))) { grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops, - s->send_closed != DONT_SEND_CLOSED, s->id, &t->hpack_compressor, &t->outbuf); + s->send_closed != DONT_SEND_CLOSED, s->id, + &t->hpack_compressor, &t->outbuf); s->writing_sopb.nops = 0; if (s->send_closed == SEND_CLOSED_WITH_RST_STREAM) { - gpr_slice_buffer_add(&t->outbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_CHTTP2_NO_ERROR)); + gpr_slice_buffer_add(&t->outbuf, grpc_chttp2_rst_stream_create( + s->id, GRPC_CHTTP2_NO_ERROR)); } if (s->send_closed != DONT_SEND_CLOSED) { stream_list_join(t, s, WRITTEN_CLOSED); @@ -1075,12 +1085,12 @@ static void perform_write(transport *t, grpc_endpoint *ep) { } } -static void add_goaway(transport *t, gpr_uint32 goaway_error, gpr_slice goaway_text) { +static void add_goaway(transport *t, gpr_uint32 goaway_error, + gpr_slice goaway_text) { if (t->num_pending_goaways == t->cap_pending_goaways) { t->cap_pending_goaways = GPR_MAX(1, t->cap_pending_goaways * 2); - t->pending_goaways = - gpr_realloc(t->pending_goaways, - sizeof(pending_goaway) * t->cap_pending_goaways); + t->pending_goaways = gpr_realloc( + t->pending_goaways, sizeof(pending_goaway) * t->cap_pending_goaways); } t->pending_goaways[t->num_pending_goaways].status = grpc_chttp2_http2_error_to_grpc_status(goaway_error); @@ -1088,13 +1098,12 @@ static void add_goaway(transport *t, gpr_uint32 goaway_error, gpr_slice goaway_t t->num_pending_goaways++; } - static void maybe_start_some_streams(transport *t) { /* start streams where we have free stream ids and free concurrency */ - while ( - t->next_stream_id <= MAX_CLIENT_STREAM_ID && - grpc_chttp2_stream_map_size(&t->stream_map) < - t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) { + while (t->next_stream_id <= MAX_CLIENT_STREAM_ID && + grpc_chttp2_stream_map_size(&t->stream_map) < + t->settings[PEER_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) { stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY); if (!s) return; @@ -1102,7 +1111,9 @@ static void maybe_start_some_streams(transport *t) { t->is_client ? "CLI" : "SVR", s, t->next_stream_id)); if (t->next_stream_id == MAX_CLIENT_STREAM_ID) { - add_goaway(t, GRPC_CHTTP2_NO_ERROR, gpr_slice_from_copied_string("Exceeded sequence number limit")); + add_goaway( + t, GRPC_CHTTP2_NO_ERROR, + gpr_slice_from_copied_string("Exceeded sequence number limit")); } GPR_ASSERT(s->id == 0); @@ -1122,7 +1133,10 @@ static void maybe_start_some_streams(transport *t) { stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY); if (!s) return; - cancel_stream(t, s, GRPC_STATUS_UNAVAILABLE, grpc_chttp2_grpc_status_to_http2_error(GRPC_STATUS_UNAVAILABLE), NULL, 0); + cancel_stream( + t, s, GRPC_STATUS_UNAVAILABLE, + grpc_chttp2_grpc_status_to_http2_error(GRPC_STATUS_UNAVAILABLE), NULL, + 0); } } @@ -1184,7 +1198,7 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { op_closure c; c.cb = op->on_consumed; c.user_data = op->on_consumed_user_data; - schedule_cb(t, c, 1); + schedule_cb(t, c, 1); } } @@ -1279,8 +1293,8 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, /* synthesize a status if we don't believe we'll get one */ gpr_ltoa(local_status, buffer); add_incoming_metadata( - t, s, - grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer)); + t, s, grpc_mdelem_from_strings(t->metadata_context, "grpc-status", + buffer)); if (!optional_message) { switch (local_status) { case GRPC_STATUS_CANCELLED: @@ -1521,7 +1535,8 @@ static int init_header_frame_parser(transport *t, int is_continuation) { t->last_incoming_stream_id, t->incoming_stream_id); return init_skip_frame(t, 1); } else if ((t->incoming_stream_id & 1) == 0) { - gpr_log(GPR_ERROR, "ignoring stream with non-client generated index %d", t->incoming_stream_id); + gpr_log(GPR_ERROR, "ignoring stream with non-client generated index %d", + t->incoming_stream_id); return init_skip_frame(t, 1); } t->incoming_stream = NULL; @@ -1581,10 +1596,10 @@ static int init_ping_parser(transport *t) { } static int init_rst_stream_parser(transport *t) { - int ok = GRPC_CHTTP2_PARSE_OK == - grpc_chttp2_rst_stream_parser_begin_frame(&t->simple_parsers.rst_stream, - t->incoming_frame_size, - t->incoming_frame_flags); + int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_rst_stream_parser_begin_frame( + &t->simple_parsers.rst_stream, + t->incoming_frame_size, + t->incoming_frame_flags); if (!ok) { drop_connection(t); } @@ -1607,12 +1622,22 @@ static int init_goaway_parser(transport *t) { } static int init_settings_frame_parser(transport *t) { - int ok = GRPC_CHTTP2_PARSE_OK == - grpc_chttp2_settings_parser_begin_frame( - &t->simple_parsers.settings, t->incoming_frame_size, - t->incoming_frame_flags, t->settings[PEER_SETTINGS]); + int ok; + + if (t->incoming_stream_id != 0) { + gpr_log(GPR_ERROR, "settings frame received for stream %d", + t->incoming_stream_id); + drop_connection(t); + return 0; + } + + ok = GRPC_CHTTP2_PARSE_OK == + grpc_chttp2_settings_parser_begin_frame( + &t->simple_parsers.settings, t->incoming_frame_size, + t->incoming_frame_flags, t->settings[PEER_SETTINGS]); if (!ok) { drop_connection(t); + return 0; } if (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) { memcpy(t->settings[ACKED_SETTINGS], t->settings[SENT_SETTINGS], @@ -1674,7 +1699,7 @@ static void add_metadata_batch(transport *t, stream *s) { we can reconstitute the list. We can't do list building here as later incoming metadata may reallocate the underlying array. */ - b.list.tail = (void*)(gpr_intptr)s->incoming_metadata_count; + b.list.tail = (void *)(gpr_intptr)s->incoming_metadata_count; b.garbage.head = b.garbage.tail = NULL; b.deadline = s->incoming_deadline; s->incoming_deadline = gpr_inf_future; @@ -2032,7 +2057,7 @@ static void patch_metadata_ops(stream *s) { int found_metadata = 0; /* rework the array of metadata into a linked list, making use - of the breadcrumbs we left in metadata batches during + of the breadcrumbs we left in metadata batches during add_metadata_batch */ for (i = 0; i < nops; i++) { grpc_stream_op *op = &ops[i]; @@ -2048,11 +2073,11 @@ static void patch_metadata_ops(stream *s) { op->data.metadata.list.head = &s->incoming_metadata[mdidx]; op->data.metadata.list.tail = &s->incoming_metadata[last_mdidx - 1]; for (j = mdidx + 1; j < last_mdidx; j++) { - s->incoming_metadata[j].prev = &s->incoming_metadata[j-1]; - s->incoming_metadata[j-1].next = &s->incoming_metadata[j]; + s->incoming_metadata[j].prev = &s->incoming_metadata[j - 1]; + s->incoming_metadata[j - 1].next = &s->incoming_metadata[j]; } s->incoming_metadata[mdidx].prev = NULL; - s->incoming_metadata[last_mdidx-1].next = NULL; + s->incoming_metadata[last_mdidx - 1].next = NULL; /* track where we're up to */ mdidx = last_mdidx; } @@ -2064,7 +2089,8 @@ static void patch_metadata_ops(stream *s) { size_t copy_bytes = sizeof(*s->incoming_metadata) * new_count; GPR_ASSERT(mdidx < s->incoming_metadata_count); s->incoming_metadata = gpr_malloc(copy_bytes); - memcpy(s->old_incoming_metadata + mdidx, s->incoming_metadata, copy_bytes); + memcpy(s->old_incoming_metadata + mdidx, s->incoming_metadata, + copy_bytes); s->incoming_metadata_count = s->incoming_metadata_capacity = new_count; } else { s->incoming_metadata = NULL; @@ -2101,7 +2127,6 @@ static void finish_reads(transport *t) { schedule_cb(t, s->recv_done_closure, 1); } } - } static void schedule_cb(transport *t, op_closure closure, int success) { @@ -2124,7 +2149,7 @@ static int prepare_callbacks(transport *t) { return t->executing_callbacks.count > 0; } -static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) { +static void run_callbacks(transport *t) { size_t i; for (i = 0; i < t->executing_callbacks.count; i++) { op_closure c = t->executing_callbacks.callbacks[i]; diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c index c80d67823f..e75b449e12 100644 --- a/src/core/transport/metadata.c +++ b/src/core/transport/metadata.c @@ -120,7 +120,7 @@ static void unlock(grpc_mdctx *ctx) { if (ctx->refs == 0) { /* uncomment if you're having trouble diagnosing an mdelem leak to make things clearer (slows down destruction a lot, however) */ - gc_mdtab(ctx); + /* gc_mdtab(ctx); */ if (ctx->mdtab_count && ctx->mdtab_count == ctx->mdtab_free) { discard_metadata(ctx); } diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h index 95497a3cc8..e080701e2d 100644 --- a/src/core/transport/stream_op.h +++ b/src/core/transport/stream_op.h @@ -58,11 +58,18 @@ typedef enum grpc_stream_op_code { GRPC_OP_SLICE } grpc_stream_op_code; -/* Arguments for GRPC_OP_BEGIN */ +/** Internal bit flag for grpc_begin_message's \a flags signaling the use of + * compression for the message */ +#define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u) +/** Mask of all valid internal flags. */ +#define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS) + +/* Arguments for GRPC_OP_BEGIN_MESSAGE */ typedef struct grpc_begin_message { /* How many bytes of data will this message contain */ gpr_uint32 length; - /* Write flags for the message: see grpc.h GRPC_WRITE_xxx */ + /* Write flags for the message: see grpc.h GRPC_WRITE_* for the public bits, + * GRPC_WRITE_INTERNAL_* for the internal ones. */ gpr_uint32 flags; } grpc_begin_message; @@ -126,10 +133,8 @@ typedef struct grpc_stream_op { } data; } grpc_stream_op; -/* A stream op buffer is a wrapper around stream operations that is dynamically - extendable. - TODO(ctiller): inline a few elements into the struct, to avoid common case - per-call allocations. */ +/** A stream op buffer is a wrapper around stream operations that is + * dynamically extendable. */ typedef struct grpc_stream_op_buffer { grpc_stream_op *ops; size_t nops; diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c index 11d5a9d780..2420a54396 100644 --- a/src/core/transport/transport_op_string.c +++ b/src/core/transport/transport_op_string.c @@ -39,6 +39,7 @@ #include "src/core/support/string.h" #include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> #include <grpc/support/useful.h> /* These routines are here to facilitate debugging - they produce string diff --git a/src/core/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c index 63b4c42131..6156a39d09 100644 --- a/src/core/tsi/ssl_transport_security.c +++ b/src/core/tsi/ssl_transport_security.c @@ -54,8 +54,16 @@ #define TSI_SSL_MAX_PROTECTED_FRAME_SIZE_UPPER_BOUND 16384 #define TSI_SSL_MAX_PROTECTED_FRAME_SIZE_LOWER_BOUND 1024 + +/* Putting a macro like this and littering the source file with #if is really + bad practice. + TODO(jboeuf): refactor all the #if / #endif in a separate module. */ +#ifndef TSI_OPENSSL_ALPN_SUPPORT +#define TSI_OPENSSL_ALPN_SUPPORT 1 +#endif + /* TODO(jboeuf): I have not found a way to get this number dynamically from the - * SSL structure. This is what we would ultimately want though... */ + SSL structure. This is what we would ultimately want though... */ #define TSI_SSL_MAX_PROTECTION_OVERHEAD 100 /* --- Structure definitions. ---*/ @@ -70,6 +78,8 @@ struct tsi_ssl_handshaker_factory { typedef struct { tsi_ssl_handshaker_factory base; SSL_CTX* ssl_context; + unsigned char* alpn_protocol_list; + size_t alpn_protocol_list_length; } tsi_ssl_client_handshaker_factory; typedef struct { @@ -841,7 +851,7 @@ static tsi_result ssl_handshaker_process_bytes_from_peer( static tsi_result ssl_handshaker_extract_peer(tsi_handshaker* self, tsi_peer* peer) { tsi_result result = TSI_OK; - const unsigned char* alpn_selected; + const unsigned char* alpn_selected = NULL; unsigned int alpn_selected_len; tsi_ssl_handshaker* impl = (tsi_ssl_handshaker*)self; X509* peer_cert = SSL_get_peer_certificate(impl->ssl); @@ -850,7 +860,14 @@ static tsi_result ssl_handshaker_extract_peer(tsi_handshaker* self, X509_free(peer_cert); if (result != TSI_OK) return result; } +#if TSI_OPENSSL_ALPN_SUPPORT SSL_get0_alpn_selected(impl->ssl, &alpn_selected, &alpn_selected_len); +#endif /* TSI_OPENSSL_ALPN_SUPPORT */ + if (alpn_selected == NULL) { + /* Try npn. */ + SSL_get0_next_proto_negotiated(impl->ssl, &alpn_selected, + &alpn_selected_len); + } if (alpn_selected != NULL) { size_t i; tsi_peer_property* new_properties = @@ -1012,6 +1029,32 @@ static tsi_result create_tsi_ssl_handshaker(SSL_CTX* ctx, int is_client, return TSI_OK; } +static int select_protocol_list(const unsigned char** out, + unsigned char* outlen, + const unsigned char* client_list, + unsigned int client_list_len, + const unsigned char* server_list, + unsigned int server_list_len) { + const unsigned char* client_current = client_list; + while ((unsigned int)(client_current - client_list) < client_list_len) { + unsigned char client_current_len = *(client_current++); + const unsigned char* server_current = server_list; + while ((server_current >= server_list) && + (gpr_uintptr)(server_current - server_list) < server_list_len) { + unsigned char server_current_len = *(server_current++); + if ((client_current_len == server_current_len) && + !memcmp(client_current, server_current, server_current_len)) { + *out = server_current; + *outlen = server_current_len; + return SSL_TLSEXT_ERR_OK; + } + server_current += server_current_len; + } + client_current += client_current_len; + } + return SSL_TLSEXT_ERR_NOACK; +} + /* --- tsi_ssl__client_handshaker_factory methods implementation. --- */ static tsi_result ssl_client_handshaker_factory_create_handshaker( @@ -1027,10 +1070,21 @@ static void ssl_client_handshaker_factory_destroy( tsi_ssl_handshaker_factory* self) { tsi_ssl_client_handshaker_factory* impl = (tsi_ssl_client_handshaker_factory*)self; - SSL_CTX_free(impl->ssl_context); + if (impl->ssl_context != NULL) SSL_CTX_free(impl->ssl_context); + if (impl->alpn_protocol_list != NULL) free(impl->alpn_protocol_list); free(impl); } +static int client_handshaker_factory_npn_callback( + SSL* ssl, unsigned char** out, unsigned char* outlen, + const unsigned char* in, unsigned int inlen, void* arg) { + tsi_ssl_client_handshaker_factory* factory = + (tsi_ssl_client_handshaker_factory*)arg; + return select_protocol_list((const unsigned char**)out, outlen, + factory->alpn_protocol_list, + factory->alpn_protocol_list_length, in, inlen); +} + /* --- tsi_ssl_server_handshaker_factory methods implementation. --- */ static tsi_result ssl_server_handshaker_factory_create_handshaker( @@ -1134,30 +1188,25 @@ static int ssl_server_handshaker_factory_servername_callback(SSL* ssl, int* ap, return SSL_TLSEXT_ERR_ALERT_WARNING; } +#if TSI_OPENSSL_ALPN_SUPPORT static int server_handshaker_factory_alpn_callback( SSL* ssl, const unsigned char** out, unsigned char* outlen, const unsigned char* in, unsigned int inlen, void* arg) { tsi_ssl_server_handshaker_factory* factory = (tsi_ssl_server_handshaker_factory*)arg; - const unsigned char* client_current = in; - while ((unsigned int)(client_current - in) < inlen) { - unsigned char client_current_len = *(client_current++); - const unsigned char* server_current = factory->alpn_protocol_list; - while ((server_current >= factory->alpn_protocol_list) && - (gpr_uintptr)(server_current - factory->alpn_protocol_list) < - factory->alpn_protocol_list_length) { - unsigned char server_current_len = *(server_current++); - if ((client_current_len == server_current_len) && - !memcmp(client_current, server_current, server_current_len)) { - *out = server_current; - *outlen = server_current_len; - return SSL_TLSEXT_ERR_OK; - } - server_current += server_current_len; - } - client_current += client_current_len; - } - return SSL_TLSEXT_ERR_NOACK; + return select_protocol_list(out, outlen, in, inlen, + factory->alpn_protocol_list, + factory->alpn_protocol_list_length); +} +#endif /* TSI_OPENSSL_ALPN_SUPPORT */ + +static int server_handshaker_factory_npn_advertised_callback( + SSL* ssl, const unsigned char** out, unsigned int* outlen, void* arg) { + tsi_ssl_server_handshaker_factory* factory = + (tsi_ssl_server_handshaker_factory*)arg; + *out = factory->alpn_protocol_list; + *outlen = factory->alpn_protocol_list_length; + return SSL_TLSEXT_ERR_OK; } /* --- tsi_ssl_handshaker_factory constructors. --- */ @@ -1184,6 +1233,14 @@ tsi_result tsi_create_ssl_client_handshaker_factory( gpr_log(GPR_ERROR, "Could not create ssl context."); return TSI_INVALID_ARGUMENT; } + + impl = calloc(1, sizeof(tsi_ssl_client_handshaker_factory)); + if (impl == NULL) { + SSL_CTX_free(ssl_context); + return TSI_OUT_OF_RESOURCES; + } + impl->ssl_context = ssl_context; + do { result = populate_ssl_context(ssl_context, pem_private_key, pem_private_key_size, @@ -1197,41 +1254,33 @@ tsi_result tsi_create_ssl_client_handshaker_factory( } if (num_alpn_protocols != 0) { - unsigned char* alpn_protocol_list = NULL; - size_t alpn_protocol_list_length = 0; - int ssl_failed; result = build_alpn_protocol_name_list( alpn_protocols, alpn_protocols_lengths, num_alpn_protocols, - &alpn_protocol_list, &alpn_protocol_list_length); + &impl->alpn_protocol_list, &impl->alpn_protocol_list_length); if (result != TSI_OK) { gpr_log(GPR_ERROR, "Building alpn list failed with error %s.", tsi_result_to_string(result)); - free(alpn_protocol_list); break; } - ssl_failed = SSL_CTX_set_alpn_protos(ssl_context, alpn_protocol_list, - alpn_protocol_list_length); - free(alpn_protocol_list); - if (ssl_failed) { +#if TSI_OPENSSL_ALPN_SUPPORT + if (SSL_CTX_set_alpn_protos(ssl_context, impl->alpn_protocol_list, + impl->alpn_protocol_list_length)) { gpr_log(GPR_ERROR, "Could not set alpn protocol list to context."); result = TSI_INVALID_ARGUMENT; break; } +#endif /* TSI_OPENSSL_ALPN_SUPPORT */ + SSL_CTX_set_next_proto_select_cb( + ssl_context, client_handshaker_factory_npn_callback, impl); } } while (0); if (result != TSI_OK) { - SSL_CTX_free(ssl_context); + ssl_client_handshaker_factory_destroy(&impl->base); return result; } SSL_CTX_set_verify(ssl_context, SSL_VERIFY_PEER, NULL); /* TODO(jboeuf): Add revocation verification. */ - impl = calloc(1, sizeof(tsi_ssl_client_handshaker_factory)); - if (impl == NULL) { - SSL_CTX_free(ssl_context); - return TSI_OUT_OF_RESOURCES; - } - impl->ssl_context = ssl_context; impl->base.create_handshaker = ssl_client_handshaker_factory_create_handshaker; impl->base.destroy = ssl_client_handshaker_factory_destroy; @@ -1322,8 +1371,13 @@ tsi_result tsi_create_ssl_server_handshaker_factory( impl->ssl_contexts[i], ssl_server_handshaker_factory_servername_callback); SSL_CTX_set_tlsext_servername_arg(impl->ssl_contexts[i], impl); +#if TSI_OPENSSL_ALPN_SUPPORT SSL_CTX_set_alpn_select_cb(impl->ssl_contexts[i], server_handshaker_factory_alpn_callback, impl); +#endif /* TSI_OPENSSL_ALPN_SUPPORT */ + SSL_CTX_set_next_protos_advertised_cb( + impl->ssl_contexts[i], + server_handshaker_factory_npn_advertised_callback, impl); } while (0); if (result != TSI_OK) { |