diff options
author | Yuchen Zeng <zyc@google.com> | 2016-11-30 15:51:25 -0800 |
---|---|---|
committer | Yuchen Zeng <zyc@google.com> | 2016-11-30 15:51:25 -0800 |
commit | b0023d25dc783ba77164c03a39bb7dcc7e446fe8 (patch) | |
tree | 5afae5cf86bc564fa08dcff01b4fdee0f084d438 /src/core/lib | |
parent | 19d7bab431d08ae721ffd651926e05320371fff5 (diff) | |
parent | 9832b9b2de8ce14dbce90d4cc90336984e8d83c3 (diff) |
Merge remote-tracking branch 'upstream/master' into cares_buildin
Diffstat (limited to 'src/core/lib')
57 files changed, 1025 insertions, 2919 deletions
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c index cfc072c0b5..401a2ad4fe 100644 --- a/src/core/lib/channel/channel_args.c +++ b/src/core/lib/channel/channel_args.c @@ -298,6 +298,12 @@ uint32_t grpc_channel_args_compression_algorithm_get_states( } } +grpc_channel_args *grpc_channel_args_set_socket_mutator( + grpc_channel_args *a, grpc_socket_mutator *mutator) { + grpc_arg tmp = grpc_socket_mutator_to_arg(mutator); + return grpc_channel_args_copy_and_add(a, &tmp, 1); +} + int grpc_channel_args_compare(const grpc_channel_args *a, const grpc_channel_args *b) { int c = GPR_ICMP(a->num_args, b->num_args); diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 1e05303471..88fc0e37a3 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -36,6 +36,7 @@ #include <grpc/compression.h> #include <grpc/grpc.h> +#include "src/core/lib/iomgr/socket_mutator.h" // Channel args are intentionally immutable, to avoid the need for locking. @@ -100,6 +101,13 @@ uint32_t grpc_channel_args_compression_algorithm_get_states( int grpc_channel_args_compare(const grpc_channel_args *a, const grpc_channel_args *b); +/** Returns a channel arg instance with socket mutator added. The socket mutator + * will perform its mutate_fd method on all file descriptors used by the + * channel. + * If \a a is non-MULL, its args are copied. */ +grpc_channel_args *grpc_channel_args_set_socket_mutator( + grpc_channel_args *a, grpc_socket_mutator *mutator); + /** Returns the value of argument \a name from \a args, or NULL if not found. */ const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args, const char *name); diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index 98ee03417f..999ad5f507 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -162,7 +162,8 @@ grpc_error *grpc_call_stack_init( grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, grpc_call_context_element *context, const void *transport_server_data, - grpc_mdstr *path, gpr_timespec deadline, grpc_call_stack *call_stack) { + grpc_mdstr *path, gpr_timespec start_time, gpr_timespec deadline, + grpc_call_stack *call_stack) { grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack); grpc_call_element_args args; size_t count = channel_stack->count; @@ -179,7 +180,7 @@ grpc_error *grpc_call_stack_init( /* init per-filter data */ grpc_error *first_error = GRPC_ERROR_NONE; - args.start_time = gpr_now(GPR_CLOCK_MONOTONIC); + args.start_time = start_time; for (i = 0; i < count; i++) { args.call_stack = call_stack; args.server_transport_data = transport_server_data; diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index c3b662c969..004643d45f 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -231,7 +231,8 @@ grpc_error *grpc_call_stack_init( grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, grpc_call_context_element *context, const void *transport_server_data, - grpc_mdstr *path, gpr_timespec deadline, grpc_call_stack *call_stack); + grpc_mdstr *path, gpr_timespec start_time, gpr_timespec deadline, + grpc_call_stack *call_stack); /* Set a pollset or a pollset_set for a call stack: must occur before the first * op is started */ void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index f57d7c2453..fd8b46afcb 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -36,6 +36,7 @@ #include <grpc/support/string_util.h> #include <string.h> #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/slice/percent_encoding.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/transport_impl.h" @@ -56,6 +57,7 @@ typedef struct call_data { grpc_linked_mdelem payload_bin; grpc_metadata_batch *recv_initial_metadata; + grpc_metadata_batch *recv_trailing_metadata; uint8_t *payload_bytes; /* Vars to read data off of send_message */ @@ -69,14 +71,16 @@ typedef struct call_data { bool send_message_blocked; /** Closure to call when finished with the hc_on_recv hook */ - grpc_closure *on_done_recv; + grpc_closure *on_done_recv_initial_metadata; + grpc_closure *on_done_recv_trailing_metadata; grpc_closure *on_complete; grpc_closure *post_send; /** Receive closures are chained: we inject this closure as the on_done_recv up-call on transport_op, and remember to call our on_done_recv member after handling it. */ - grpc_closure hc_on_recv; + grpc_closure hc_on_recv_initial_metadata; + grpc_closure hc_on_recv_trailing_metadata; grpc_closure hc_on_complete; grpc_closure got_slice; grpc_closure send_done; @@ -106,6 +110,16 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) { grpc_call_element_send_close_with_message(a->exec_ctx, a->elem, GRPC_STATUS_CANCELLED, &message); return NULL; + } else if (md->key == GRPC_MDSTR_GRPC_MESSAGE) { + grpc_slice pct_decoded_msg = + grpc_permissive_percent_decode_slice(md->value->slice); + if (grpc_slice_is_equivalent(pct_decoded_msg, md->value->slice)) { + grpc_slice_unref(pct_decoded_msg); + return md; + } else { + return grpc_mdelem_from_metadata_strings( + GRPC_MDSTR_GRPC_MESSAGE, grpc_mdstr_from_slice(pct_decoded_msg)); + } } else if (md == GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC) { return NULL; } else if (md->key == GRPC_MDSTR_CONTENT_TYPE) { @@ -129,8 +143,8 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) { return md; } -static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, - grpc_error *error) { +static void hc_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, + void *user_data, grpc_error *error) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; client_recv_filter_args a; @@ -138,7 +152,21 @@ static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, a.exec_ctx = exec_ctx; grpc_metadata_batch_filter(calld->recv_initial_metadata, client_recv_filter, &a); - calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, error); + grpc_closure_run(exec_ctx, calld->on_done_recv_initial_metadata, + GRPC_ERROR_REF(error)); +} + +static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, + void *user_data, grpc_error *error) { + grpc_call_element *elem = user_data; + call_data *calld = elem->call_data; + client_recv_filter_args a; + a.elem = elem; + a.exec_ctx = exec_ctx; + grpc_metadata_batch_filter(calld->recv_trailing_metadata, client_recv_filter, + &a); + grpc_closure_run(exec_ctx, calld->on_done_recv_trailing_metadata, + GRPC_ERROR_REF(error)); } static void hc_on_complete(grpc_exec_ctx *exec_ctx, void *user_data, @@ -217,12 +245,15 @@ static void hc_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, message, and the payload is below the size threshold, and all the data for this request is immediately available. */ grpc_mdelem *method = GRPC_MDELEM_METHOD_POST; - calld->send_message_blocked = false; if ((op->send_initial_metadata_flags & GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) && op->send_message != NULL && op->send_message->length < channeld->max_payload_size_for_get) { method = GRPC_MDELEM_METHOD_GET; + /* The following write to calld->send_message_blocked isn't racy with + reads in hc_start_transport_op (which deals with SEND_MESSAGE ops) because + being here means ops->send_message is not NULL, which is primarily + guarding the read there. */ calld->send_message_blocked = true; } else if (op->send_initial_metadata_flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) { @@ -281,8 +312,15 @@ static void hc_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, if (op->recv_initial_metadata != NULL) { /* substitute our callback for the higher callback */ calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv = op->recv_initial_metadata_ready; - op->recv_initial_metadata_ready = &calld->hc_on_recv; + calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready; + op->recv_initial_metadata_ready = &calld->hc_on_recv_initial_metadata; + } + + if (op->recv_trailing_metadata != NULL) { + /* substitute our callback for the higher callback */ + calld->recv_trailing_metadata = op->recv_trailing_metadata; + calld->on_done_recv_trailing_metadata = op->on_complete; + op->on_complete = &calld->hc_on_recv_trailing_metadata; } } @@ -296,8 +334,7 @@ static void hc_start_transport_op(grpc_exec_ctx *exec_ctx, call_data *calld = elem->call_data; if (op->send_message != NULL && calld->send_message_blocked) { /* Don't forward the op. send_message contains slices that aren't ready - yet. The call will be forwarded by the op_complete of slice read call. - */ + yet. The call will be forwarded by the op_complete of slice read call. */ } else { grpc_call_next_op(exec_ctx, elem, op); } @@ -308,11 +345,16 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) { call_data *calld = elem->call_data; - calld->on_done_recv = NULL; + calld->on_done_recv_initial_metadata = NULL; + calld->on_done_recv_trailing_metadata = NULL; calld->on_complete = NULL; calld->payload_bytes = NULL; + calld->send_message_blocked = false; grpc_slice_buffer_init(&calld->slices); - grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem); + grpc_closure_init(&calld->hc_on_recv_initial_metadata, + hc_on_recv_initial_metadata, elem); + grpc_closure_init(&calld->hc_on_recv_trailing_metadata, + hc_on_recv_trailing_metadata, elem); grpc_closure_init(&calld->hc_on_complete, hc_on_complete, elem); grpc_closure_init(&calld->got_slice, got_slice, elem); grpc_closure_init(&calld->send_done, send_done, elem); diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c index 6a33689fec..b42ff06039 100644 --- a/src/core/lib/channel/http_server_filter.c +++ b/src/core/lib/channel/http_server_filter.c @@ -37,6 +37,7 @@ #include <grpc/support/log.h> #include <string.h> #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/slice/percent_encoding.h" #include "src/core/lib/transport/static_metadata.h" #define EXPECTED_CONTENT_TYPE "application/grpc" @@ -86,6 +87,23 @@ typedef struct { grpc_exec_ctx *exec_ctx; } server_filter_args; +static grpc_mdelem *server_filter_outgoing_metadata(void *user_data, + grpc_mdelem *md) { + if (md->key == GRPC_MDSTR_GRPC_MESSAGE) { + grpc_slice pct_encoded_msg = grpc_percent_encode_slice( + md->value->slice, grpc_compatible_percent_encoding_unreserved_bytes); + if (grpc_slice_is_equivalent(pct_encoded_msg, md->value->slice)) { + grpc_slice_unref(pct_encoded_msg); + return md; + } else { + return grpc_mdelem_from_metadata_strings( + GRPC_MDSTR_GRPC_MESSAGE, grpc_mdstr_from_slice(pct_encoded_msg)); + } + } else { + return md; + } +} + static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { server_filter_args *a = user_data; grpc_call_element *elem = a->elem; @@ -254,7 +272,7 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data, } } -static void hs_mutate_op(grpc_call_element *elem, +static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; @@ -290,6 +308,12 @@ static void hs_mutate_op(grpc_call_element *elem, op->on_complete = &calld->hs_on_complete; } } + + if (op->send_trailing_metadata) { + server_filter_args a = {elem, exec_ctx}; + grpc_metadata_batch_filter(op->send_trailing_metadata, + server_filter_outgoing_metadata, &a); + } } static void hs_start_transport_op(grpc_exec_ctx *exec_ctx, @@ -297,7 +321,7 @@ static void hs_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); GPR_TIMER_BEGIN("hs_start_transport_op", 0); - hs_mutate_op(elem, op); + hs_mutate_op(exec_ctx, elem, op); grpc_call_next_op(exec_ctx, elem, op); GPR_TIMER_END("hs_start_transport_op", 0); } diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index 1331fe1c65..1cf68d790d 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -34,16 +34,14 @@ #include <limits.h> #include <string.h> +#include <grpc/impl/codegen/grpc_types.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/transport/method_config.h" - -#define DEFAULT_MAX_SEND_MESSAGE_LENGTH -1 // Unlimited. -// The protobuf library will (by default) start warning at 100 megs. -#define DEFAULT_MAX_RECV_MESSAGE_LENGTH (4 * 1024 * 1024) +#include "src/core/lib/support/string.h" +#include "src/core/lib/transport/service_config.h" typedef struct message_size_limits { int max_send_size; @@ -56,30 +54,29 @@ static void* message_size_limits_copy(void* value) { return new_value; } -static int message_size_limits_cmp(void* value1, void* value2) { - const message_size_limits* v1 = value1; - const message_size_limits* v2 = value2; - if (v1->max_send_size > v2->max_send_size) return 1; - if (v1->max_send_size < v2->max_send_size) return -1; - if (v1->max_recv_size > v2->max_recv_size) return 1; - if (v1->max_recv_size < v2->max_recv_size) return -1; - return 0; -} - static const grpc_mdstr_hash_table_vtable message_size_limits_vtable = { - gpr_free, message_size_limits_copy, message_size_limits_cmp}; - -static void* method_config_convert_value( - const grpc_method_config* method_config) { + gpr_free, message_size_limits_copy}; + +static void* message_size_limits_create_from_json(const grpc_json* json) { + int max_request_message_bytes = -1; + int max_response_message_bytes = -1; + for (grpc_json* field = json->child; field != NULL; field = field->next) { + if (field->key == NULL) continue; + if (strcmp(field->key, "maxRequestMessageBytes") == 0) { + if (max_request_message_bytes >= 0) return NULL; // Duplicate. + if (field->type != GRPC_JSON_STRING) return NULL; + max_request_message_bytes = gpr_parse_nonnegative_int(field->value); + if (max_request_message_bytes == -1) return NULL; + } else if (strcmp(field->key, "maxResponseMessageBytes") == 0) { + if (max_response_message_bytes >= 0) return NULL; // Duplicate. + if (field->type != GRPC_JSON_STRING) return NULL; + max_response_message_bytes = gpr_parse_nonnegative_int(field->value); + if (max_response_message_bytes == -1) return NULL; + } + } message_size_limits* value = gpr_malloc(sizeof(message_size_limits)); - const int32_t* max_request_message_bytes = - grpc_method_config_get_max_request_message_bytes(method_config); - value->max_send_size = - max_request_message_bytes != NULL ? *max_request_message_bytes : -1; - const int32_t* max_response_message_bytes = - grpc_method_config_get_max_response_message_bytes(method_config); - value->max_recv_size = - max_response_message_bytes != NULL ? *max_response_message_bytes : -1; + value->max_send_size = max_request_message_bytes; + value->max_recv_size = max_response_message_bytes; return value; } @@ -201,20 +198,20 @@ static void init_channel_elem(grpc_exec_ctx* exec_ctx, GPR_ASSERT(!args->is_last); channel_data* chand = elem->channel_data; memset(chand, 0, sizeof(*chand)); - chand->max_send_size = DEFAULT_MAX_SEND_MESSAGE_LENGTH; - chand->max_recv_size = DEFAULT_MAX_RECV_MESSAGE_LENGTH; + chand->max_send_size = GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH; + chand->max_recv_size = GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH; for (size_t i = 0; i < args->channel_args->num_args; ++i) { if (strcmp(args->channel_args->args[i].key, GRPC_ARG_MAX_SEND_MESSAGE_LENGTH) == 0) { - const grpc_integer_options options = {DEFAULT_MAX_SEND_MESSAGE_LENGTH, 0, - INT_MAX}; + const grpc_integer_options options = { + GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH, 0, INT_MAX}; chand->max_send_size = grpc_channel_arg_get_integer(&args->channel_args->args[i], options); } if (strcmp(args->channel_args->args[i].key, GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH) == 0) { - const grpc_integer_options options = {DEFAULT_MAX_RECV_MESSAGE_LENGTH, 0, - INT_MAX}; + const grpc_integer_options options = { + GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH, 0, INT_MAX}; chand->max_recv_size = grpc_channel_arg_get_integer(&args->channel_args->args[i], options); } @@ -223,10 +220,16 @@ static void init_channel_elem(grpc_exec_ctx* exec_ctx, const grpc_arg* channel_arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG); if (channel_arg != NULL) { - GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER); - chand->method_limit_table = grpc_method_config_table_convert( - (grpc_method_config_table*)channel_arg->value.pointer.p, - method_config_convert_value, &message_size_limits_vtable); + GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); + grpc_service_config* service_config = + grpc_service_config_create(channel_arg->value.string); + if (service_config != NULL) { + chand->method_limit_table = + grpc_service_config_create_method_config_table( + service_config, message_size_limits_create_from_json, + &message_size_limits_vtable); + grpc_service_config_destroy(service_config); + } } } diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c index a4b6668276..2d300f4560 100644 --- a/src/core/lib/iomgr/endpoint.c +++ b/src/core/lib/iomgr/endpoint.c @@ -66,6 +66,8 @@ char* grpc_endpoint_get_peer(grpc_endpoint* ep) { return ep->vtable->get_peer(ep); } +int grpc_endpoint_get_fd(grpc_endpoint* ep) { return ep->vtable->get_fd(ep); } + grpc_workqueue* grpc_endpoint_get_workqueue(grpc_endpoint* ep) { return ep->vtable->get_workqueue(ep); } diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index bf211ca16a..1609b64f2b 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -61,6 +61,7 @@ struct grpc_endpoint_vtable { void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); grpc_resource_user *(*get_resource_user)(grpc_endpoint *ep); char *(*get_peer)(grpc_endpoint *ep); + int (*get_fd)(grpc_endpoint *ep); }; /* When data is available on the connection, calls the callback with slices. @@ -73,6 +74,10 @@ void grpc_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, char *grpc_endpoint_get_peer(grpc_endpoint *ep); +/* Get the file descriptor used by \a ep. Return -1 if \a ep is not using an fd. + */ +int grpc_endpoint_get_fd(grpc_endpoint *ep); + /* Retrieve a reference to the workqueue associated with this endpoint */ grpc_workqueue *grpc_endpoint_get_workqueue(grpc_endpoint *ep); diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index db51ec4939..07fbfd849e 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -72,6 +72,11 @@ static int grpc_polling_trace = 0; /* Disabled by default */ static int grpc_wakeup_signal = -1; static bool is_grpc_wakeup_signal_initialized = false; +/* TODO: sreek: Right now, this wakes up all pollers. In future we should make + * sure to wake up one polling thread (which can wake up other threads if + * needed) */ +static grpc_wakeup_fd global_wakeup_fd; + /* Implements the function defined in grpc_posix.h. This function might be * called before even calling grpc_init() to set either a different signal to * use. If signum == -1, then the use of signals is disabled */ @@ -163,7 +168,7 @@ static void fd_global_shutdown(void); #define PI_ADD_REF(p, r) pi_add_ref((p)) #define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p)) -#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */ +#endif /* !defined(GRPC_PI_REF_COUNT_DEBUG) */ /* This is also used as grpc_workqueue (by directly casing it) */ typedef struct polling_island { @@ -437,9 +442,8 @@ static void polling_island_add_wakeup_fd_locked(polling_island *pi, gpr_asprintf(&err_msg, "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with " "error: %d (%s)", - pi->epoll_fd, - GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), errno, - strerror(errno)); + pi->epoll_fd, GRPC_WAKEUP_FD_GET_READ_FD(&global_wakeup_fd), + errno, strerror(errno)); append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); gpr_free(err_msg); } @@ -541,7 +545,7 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, goto done; } - polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error); + polling_island_add_wakeup_fd_locked(pi, &global_wakeup_fd, error); polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error); if (initial_fd != NULL) { @@ -843,11 +847,6 @@ static void polling_island_global_shutdown() { * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a * case occurs. */ -/* TODO: sreek: Right now, this wakes up all pollers. In future we should make - * sure to wake up one polling thread (which can wake up other threads if - * needed) */ -grpc_wakeup_fd grpc_global_wakeup_fd; - static grpc_fd *fd_freelist = NULL; static gpr_mu fd_freelist_mu; @@ -1163,11 +1162,11 @@ static grpc_error *pollset_global_init(void) { gpr_tls_init(&g_current_thread_pollset); gpr_tls_init(&g_current_thread_worker); poller_kick_init(); - return grpc_wakeup_fd_init(&grpc_global_wakeup_fd); + return grpc_wakeup_fd_init(&global_wakeup_fd); } static void pollset_global_shutdown(void) { - grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd); + grpc_wakeup_fd_destroy(&global_wakeup_fd); gpr_tls_destroy(&g_current_thread_pollset); gpr_tls_destroy(&g_current_thread_worker); } @@ -1274,7 +1273,7 @@ static grpc_error *pollset_kick(grpc_pollset *p, } static grpc_error *kick_poller(void) { - return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); + return grpc_wakeup_fd_wakeup(&global_wakeup_fd); } static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { @@ -1501,13 +1500,11 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, for (int i = 0; i < ep_rv; ++i) { void *data_ptr = ep_ev[i].data.ptr; - if (data_ptr == &grpc_global_wakeup_fd) { - append_error(error, - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd), + if (data_ptr == &global_wakeup_fd) { + append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), err_desc); } else if (data_ptr == &pi->workqueue_wakeup_fd) { - append_error(error, - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd), + append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), err_desc); maybe_do_workqueue_work(exec_ctx, pi); } else if (data_ptr == &polling_island_wakeup_fd) { diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c deleted file mode 100644 index bf51404203..0000000000 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c +++ /dev/null @@ -1,2076 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -/* This file will be removed shortly: it's here to keep refactoring - * steps simple and auditable. - * It's the combination of the old files: - * - fd_posix.{h,c} - * - pollset_posix.{h,c} - * - pullset_multipoller_with_{poll,epoll}.{h,c} - * The new version will be split into: - * - ev_poll_posix.{h,c} - * - ev_epoll_posix.{h,c} - */ - -#include "src/core/lib/iomgr/port.h" - -#ifdef GRPC_POSIX_SOCKET - -#include "src/core/lib/iomgr/ev_poll_and_epoll_posix.h" - -#include <assert.h> -#include <errno.h> -#include <poll.h> -#include <string.h> -#include <sys/socket.h> -#include <unistd.h> - -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <grpc/support/tls.h> -#include <grpc/support/useful.h> - -#include "src/core/lib/iomgr/iomgr_internal.h" -#include "src/core/lib/iomgr/wakeup_fd_posix.h" -#include "src/core/lib/profiling/timers.h" -#include "src/core/lib/support/block_annotate.h" - -/******************************************************************************* - * FD declarations - */ - -typedef struct grpc_fd_watcher { - struct grpc_fd_watcher *next; - struct grpc_fd_watcher *prev; - grpc_pollset *pollset; - grpc_pollset_worker *worker; - grpc_fd *fd; -} grpc_fd_watcher; - -struct grpc_fd { - int fd; - /* refst format: - bit0: 1=active/0=orphaned - bit1-n: refcount - meaning that mostly we ref by two to avoid altering the orphaned bit, - and just unref by 1 when we're ready to flag the object as orphaned */ - gpr_atm refst; - - gpr_mu mu; - int shutdown; - int closed; - int released; - - /* The watcher list. - - The following watcher related fields are protected by watcher_mu. - - An fd_watcher is an ephemeral object created when an fd wants to - begin polling, and destroyed after the poll. - - It denotes the fd's interest in whether to read poll or write poll - or both or neither on this fd. - - If a watcher is asked to poll for reads or writes, the read_watcher - or write_watcher fields are set respectively. A watcher may be asked - to poll for both, in which case both fields will be set. - - read_watcher and write_watcher may be NULL if no watcher has been - asked to poll for reads or writes. - - If an fd_watcher is not asked to poll for reads or writes, it's added - to a linked list of inactive watchers, rooted at inactive_watcher_root. - If at a later time there becomes need of a poller to poll, one of - the inactive pollers may be kicked out of their poll loops to take - that responsibility. */ - grpc_fd_watcher inactive_watcher_root; - grpc_fd_watcher *read_watcher; - grpc_fd_watcher *write_watcher; - - grpc_closure *read_closure; - grpc_closure *write_closure; - - struct grpc_fd *freelist_next; - - grpc_closure *on_done_closure; - - grpc_iomgr_object iomgr_object; - - /* The pollset that last noticed and notified that the fd is readable */ - grpc_pollset *read_notifier_pollset; -}; - -/* Begin polling on an fd. - Registers that the given pollset is interested in this fd - so that if read - or writability interest changes, the pollset can be kicked to pick up that - new interest. - Return value is: - (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0) - i.e. a combination of read_mask and write_mask determined by the fd's current - interest in said events. - Polling strategies that do not need to alter their behavior depending on the - fd's current interest (such as epoll) do not need to call this function. - MUST NOT be called with a pollset lock taken */ -static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, - grpc_pollset_worker *worker, uint32_t read_mask, - uint32_t write_mask, grpc_fd_watcher *rec); -/* Complete polling previously started with fd_begin_poll - MUST NOT be called with a pollset lock taken - if got_read or got_write are 1, also does the become_{readable,writable} as - appropriate. */ -static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec, - int got_read, int got_write, - grpc_pollset *read_notifier_pollset); - -/* Return 1 if this fd is orphaned, 0 otherwise */ -static bool fd_is_orphaned(grpc_fd *fd); - -/* Reference counting for fds */ -/*#define GRPC_FD_REF_COUNT_DEBUG*/ -#ifdef GRPC_FD_REF_COUNT_DEBUG -static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line); -static void fd_unref(grpc_fd *fd, const char *reason, const char *file, - int line); -#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__) -#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__) -#else -static void fd_ref(grpc_fd *fd); -static void fd_unref(grpc_fd *fd); -#define GRPC_FD_REF(fd, reason) fd_ref(fd) -#define GRPC_FD_UNREF(fd, reason) fd_unref(fd) -#endif - -static void fd_global_init(void); -static void fd_global_shutdown(void); - -#define CLOSURE_NOT_READY ((grpc_closure *)0) -#define CLOSURE_READY ((grpc_closure *)1) - -/******************************************************************************* - * pollset declarations - */ - -typedef struct grpc_pollset_vtable grpc_pollset_vtable; - -typedef struct grpc_cached_wakeup_fd { - grpc_wakeup_fd fd; - struct grpc_cached_wakeup_fd *next; -} grpc_cached_wakeup_fd; - -struct grpc_pollset_worker { - grpc_cached_wakeup_fd *wakeup_fd; - int reevaluate_polling_on_wakeup; - int kicked_specifically; - struct grpc_pollset_worker *next; - struct grpc_pollset_worker *prev; -}; - -struct grpc_pollset { - /* pollsets under posix can mutate representation as fds are added and - removed. - For example, we may choose a poll() based implementation on linux for - few fds, and an epoll() based implementation for many fds */ - const grpc_pollset_vtable *vtable; - gpr_mu mu; - grpc_pollset_worker root_worker; - int in_flight_cbs; - int shutting_down; - int called_shutdown; - int kicked_without_pollers; - grpc_closure *shutdown_done; - grpc_closure_list idle_jobs; - union { - int fd; - void *ptr; - } data; - /* Local cache of eventfds for workers */ - grpc_cached_wakeup_fd *local_wakeup_cache; -}; - -struct grpc_pollset_vtable { - void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - struct grpc_fd *fd, int and_unlock_pollset); - grpc_error *(*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - grpc_pollset_worker *worker, - gpr_timespec deadline, gpr_timespec now); - void (*finish_shutdown)(grpc_pollset *pollset); - void (*destroy)(grpc_pollset *pollset); -}; - -/* Add an fd to a pollset */ -static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - struct grpc_fd *fd); - -static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pollset_set, grpc_fd *fd); - -/* Convert a timespec to milliseconds: - - very small or negative poll times are clamped to zero to do a - non-blocking poll (which becomes spin polling) - - other small values are rounded up to one millisecond - - longer than a millisecond polls are rounded up to the next nearest - millisecond to avoid spinning - - infinite timeouts are converted to -1 */ -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now); - -/* Allow kick to wakeup the currently polling worker */ -#define GRPC_POLLSET_CAN_KICK_SELF 1 -/* Force the wakee to repoll when awoken */ -#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2 -/* As per pollset_kick, with an extended set of flags (defined above) - -- mostly for fd_posix's use. */ -static grpc_error *pollset_kick_ext(grpc_pollset *p, - grpc_pollset_worker *specific_worker, - uint32_t flags) GRPC_MUST_USE_RESULT; - -/* turn a pollset into a multipoller: platform specific */ -typedef void (*platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - struct grpc_fd **fds, - size_t fd_count); -static platform_become_multipoller_type platform_become_multipoller; - -/* Return 1 if the pollset has active threads in pollset_work (pollset must - * be locked) */ -static int pollset_has_workers(grpc_pollset *pollset); - -static void remove_fd_from_all_epoll_sets(int fd); - -/******************************************************************************* - * pollset_set definitions - */ - -struct grpc_pollset_set { - gpr_mu mu; - - size_t pollset_count; - size_t pollset_capacity; - grpc_pollset **pollsets; - - size_t pollset_set_count; - size_t pollset_set_capacity; - struct grpc_pollset_set **pollset_sets; - - size_t fd_count; - size_t fd_capacity; - grpc_fd **fds; -}; - -/******************************************************************************* - * fd_posix.c - */ - -/* We need to keep a freelist not because of any concerns of malloc performance - * but instead so that implementations with multiple threads in (for example) - * epoll_wait deal with the race between pollset removal and incoming poll - * notifications. - * - * The problem is that the poller ultimately holds a reference to this - * object, so it is very difficult to know when is safe to free it, at least - * without some expensive synchronization. - * - * If we keep the object freelisted, in the worst case losing this race just - * becomes a spurious read notification on a reused fd. - */ -/* TODO(klempner): We could use some form of polling generation count to know - * when these are safe to free. */ -/* TODO(klempner): Consider disabling freelisting if we don't have multiple - * threads in poll on the same fd */ -/* TODO(klempner): Batch these allocations to reduce fragmentation */ -static grpc_fd *fd_freelist = NULL; -static gpr_mu fd_freelist_mu; - -static void freelist_fd(grpc_fd *fd) { - gpr_mu_lock(&fd_freelist_mu); - fd->freelist_next = fd_freelist; - fd_freelist = fd; - grpc_iomgr_unregister_object(&fd->iomgr_object); - gpr_mu_unlock(&fd_freelist_mu); -} - -static grpc_fd *alloc_fd(int fd) { - grpc_fd *r = NULL; - gpr_mu_lock(&fd_freelist_mu); - if (fd_freelist != NULL) { - r = fd_freelist; - fd_freelist = fd_freelist->freelist_next; - } - gpr_mu_unlock(&fd_freelist_mu); - if (r == NULL) { - r = gpr_malloc(sizeof(grpc_fd)); - gpr_mu_init(&r->mu); - } - - gpr_mu_lock(&r->mu); - gpr_atm_rel_store(&r->refst, 1); - r->shutdown = 0; - r->read_closure = CLOSURE_NOT_READY; - r->write_closure = CLOSURE_NOT_READY; - r->fd = fd; - r->inactive_watcher_root.next = r->inactive_watcher_root.prev = - &r->inactive_watcher_root; - r->freelist_next = NULL; - r->read_watcher = r->write_watcher = NULL; - r->on_done_closure = NULL; - r->closed = 0; - r->released = 0; - r->read_notifier_pollset = NULL; - gpr_mu_unlock(&r->mu); - return r; -} - -static void destroy(grpc_fd *fd) { - gpr_mu_destroy(&fd->mu); - gpr_free(fd); -} - -#ifdef GRPC_FD_REF_COUNT_DEBUG -#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__) -#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__) -static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file, - int line) { - gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n, - gpr_atm_no_barrier_load(&fd->refst), - gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line); -#else -#define REF_BY(fd, n, reason) ref_by(fd, n) -#define UNREF_BY(fd, n, reason) unref_by(fd, n) -static void ref_by(grpc_fd *fd, int n) { -#endif - GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0); -} - -#ifdef GRPC_FD_REF_COUNT_DEBUG -static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file, - int line) { - gpr_atm old; - gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n, - gpr_atm_no_barrier_load(&fd->refst), - gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line); -#else -static void unref_by(grpc_fd *fd, int n) { - gpr_atm old; -#endif - old = gpr_atm_full_fetch_add(&fd->refst, -n); - if (old == n) { - freelist_fd(fd); - } else { - GPR_ASSERT(old > n); - } -} - -static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); } - -static void fd_global_shutdown(void) { - gpr_mu_lock(&fd_freelist_mu); - gpr_mu_unlock(&fd_freelist_mu); - while (fd_freelist != NULL) { - grpc_fd *fd = fd_freelist; - fd_freelist = fd_freelist->freelist_next; - destroy(fd); - } - gpr_mu_destroy(&fd_freelist_mu); -} - -static grpc_fd *fd_create(int fd, const char *name) { - grpc_fd *r = alloc_fd(fd); - char *name2; - gpr_asprintf(&name2, "%s fd=%d", name, fd); - grpc_iomgr_register_object(&r->iomgr_object, name2); - gpr_free(name2); -#ifdef GRPC_FD_REF_COUNT_DEBUG - gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name); -#endif - return r; -} - -static bool fd_is_orphaned(grpc_fd *fd) { - return (gpr_atm_acq_load(&fd->refst) & 1) == 0; -} - -static grpc_error *pollset_kick_locked(grpc_fd_watcher *watcher) { - gpr_mu_lock(&watcher->pollset->mu); - GPR_ASSERT(watcher->worker); - grpc_error *err = pollset_kick_ext(watcher->pollset, watcher->worker, - GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); - gpr_mu_unlock(&watcher->pollset->mu); - return err; -} - -static void maybe_wake_one_watcher_locked(grpc_fd *fd) { - if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) { - pollset_kick_locked(fd->inactive_watcher_root.next); - } else if (fd->read_watcher) { - pollset_kick_locked(fd->read_watcher); - } else if (fd->write_watcher) { - pollset_kick_locked(fd->write_watcher); - } -} - -static void wake_all_watchers_locked(grpc_fd *fd) { - grpc_fd_watcher *watcher; - for (watcher = fd->inactive_watcher_root.next; - watcher != &fd->inactive_watcher_root; watcher = watcher->next) { - pollset_kick_locked(watcher); - } - if (fd->read_watcher) { - pollset_kick_locked(fd->read_watcher); - } - if (fd->write_watcher && fd->write_watcher != fd->read_watcher) { - pollset_kick_locked(fd->write_watcher); - } -} - -static int has_watchers(grpc_fd *fd) { - return fd->read_watcher != NULL || fd->write_watcher != NULL || - fd->inactive_watcher_root.next != &fd->inactive_watcher_root; -} - -static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - fd->closed = 1; - if (!fd->released) { - close(fd->fd); - } else { - remove_fd_from_all_epoll_sets(fd->fd); - } - grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL); -} - -static int fd_wrapped_fd(grpc_fd *fd) { - if (fd->released || fd->closed) { - return -1; - } else { - return fd->fd; - } -} - -static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, - grpc_closure *on_done, int *release_fd, - const char *reason) { - fd->on_done_closure = on_done; - fd->released = release_fd != NULL; - if (!fd->released) { - shutdown(fd->fd, SHUT_RDWR); - } else { - *release_fd = fd->fd; - } - gpr_mu_lock(&fd->mu); - REF_BY(fd, 1, reason); /* remove active status, but keep referenced */ - if (!has_watchers(fd)) { - close_fd_locked(exec_ctx, fd); - } else { - wake_all_watchers_locked(fd); - } - gpr_mu_unlock(&fd->mu); - UNREF_BY(fd, 2, reason); /* drop the reference */ -} - -/* increment refcount by two to avoid changing the orphan bit */ -#ifdef GRPC_FD_REF_COUNT_DEBUG -static void fd_ref(grpc_fd *fd, const char *reason, const char *file, - int line) { - ref_by(fd, 2, reason, file, line); -} - -static void fd_unref(grpc_fd *fd, const char *reason, const char *file, - int line) { - unref_by(fd, 2, reason, file, line); -} -#else -static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); } - -static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); } -#endif - -static grpc_error *fd_shutdown_error(bool shutdown) { - if (!shutdown) { - return GRPC_ERROR_NONE; - } else { - return GRPC_ERROR_CREATE("FD shutdown"); - } -} - -static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, - grpc_closure **st, grpc_closure *closure) { - if (fd->shutdown) { - grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"), - NULL); - } else if (*st == CLOSURE_NOT_READY) { - /* not ready ==> switch to a waiting state by setting the closure */ - *st = closure; - } else if (*st == CLOSURE_READY) { - /* already ready ==> queue the closure to run immediately */ - *st = CLOSURE_NOT_READY; - grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown), - NULL); - maybe_wake_one_watcher_locked(fd); - } else { - /* upcallptr was set to a different closure. This is an error! */ - gpr_log(GPR_ERROR, - "User called a notify_on function with a previous callback still " - "pending"); - abort(); - } -} - -/* returns 1 if state becomes not ready */ -static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, - grpc_closure **st) { - if (*st == CLOSURE_READY) { - /* duplicate ready ==> ignore */ - return 0; - } else if (*st == CLOSURE_NOT_READY) { - /* not ready, and not waiting ==> flag ready */ - *st = CLOSURE_READY; - return 0; - } else { - /* waiting ==> queue closure */ - grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL); - *st = CLOSURE_NOT_READY; - return 1; - } -} - -static void set_read_notifier_pollset_locked( - grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *read_notifier_pollset) { - fd->read_notifier_pollset = read_notifier_pollset; -} - -static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - gpr_mu_lock(&fd->mu); - /* only shutdown once */ - if (!fd->shutdown) { - fd->shutdown = 1; - /* signal read/write closed to OS so that future operations fail */ - shutdown(fd->fd, SHUT_RDWR); - set_ready_locked(exec_ctx, fd, &fd->read_closure); - set_ready_locked(exec_ctx, fd, &fd->write_closure); - } - gpr_mu_unlock(&fd->mu); -} - -static bool fd_is_shutdown(grpc_fd *fd) { - gpr_mu_lock(&fd->mu); - bool r = fd->shutdown; - gpr_mu_unlock(&fd->mu); - return r; -} - -static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, - grpc_closure *closure) { - gpr_mu_lock(&fd->mu); - notify_on_locked(exec_ctx, fd, &fd->read_closure, closure); - gpr_mu_unlock(&fd->mu); -} - -static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, - grpc_closure *closure) { - gpr_mu_lock(&fd->mu); - notify_on_locked(exec_ctx, fd, &fd->write_closure, closure); - gpr_mu_unlock(&fd->mu); -} - -/* Return the read-notifier pollset */ -static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, - grpc_fd *fd) { - grpc_pollset *notifier = NULL; - - gpr_mu_lock(&fd->mu); - notifier = fd->read_notifier_pollset; - gpr_mu_unlock(&fd->mu); - - return notifier; -} - -static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, - grpc_pollset_worker *worker, uint32_t read_mask, - uint32_t write_mask, grpc_fd_watcher *watcher) { - uint32_t mask = 0; - grpc_closure *cur; - int requested; - /* keep track of pollers that have requested our events, in case they change - */ - GRPC_FD_REF(fd, "poll"); - - gpr_mu_lock(&fd->mu); - - /* if we are shutdown, then don't add to the watcher set */ - if (fd->shutdown) { - watcher->fd = NULL; - watcher->pollset = NULL; - watcher->worker = NULL; - gpr_mu_unlock(&fd->mu); - GRPC_FD_UNREF(fd, "poll"); - return 0; - } - - /* if there is nobody polling for read, but we need to, then start doing so */ - cur = fd->read_closure; - requested = cur != CLOSURE_READY; - if (read_mask && fd->read_watcher == NULL && requested) { - fd->read_watcher = watcher; - mask |= read_mask; - } - /* if there is nobody polling for write, but we need to, then start doing so - */ - cur = fd->write_closure; - requested = cur != CLOSURE_READY; - if (write_mask && fd->write_watcher == NULL && requested) { - fd->write_watcher = watcher; - mask |= write_mask; - } - /* if not polling, remember this watcher in case we need someone to later */ - if (mask == 0 && worker != NULL) { - watcher->next = &fd->inactive_watcher_root; - watcher->prev = watcher->next->prev; - watcher->next->prev = watcher->prev->next = watcher; - } - watcher->pollset = pollset; - watcher->worker = worker; - watcher->fd = fd; - gpr_mu_unlock(&fd->mu); - - return mask; -} - -static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, - int got_read, int got_write, - grpc_pollset *read_notifier_pollset) { - int was_polling = 0; - int kick = 0; - grpc_fd *fd = watcher->fd; - - if (fd == NULL) { - return; - } - - gpr_mu_lock(&fd->mu); - - if (watcher == fd->read_watcher) { - /* remove read watcher, kick if we still need a read */ - was_polling = 1; - if (!got_read) { - kick = 1; - } - fd->read_watcher = NULL; - } - if (watcher == fd->write_watcher) { - /* remove write watcher, kick if we still need a write */ - was_polling = 1; - if (!got_write) { - kick = 1; - } - fd->write_watcher = NULL; - } - if (!was_polling && watcher->worker != NULL) { - /* remove from inactive list */ - watcher->next->prev = watcher->prev; - watcher->prev->next = watcher->next; - } - if (got_read) { - if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) { - kick = 1; - } - - if (read_notifier_pollset != NULL) { - set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset); - } - } - if (got_write) { - if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) { - kick = 1; - } - } - if (kick) { - maybe_wake_one_watcher_locked(fd); - } - if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) { - close_fd_locked(exec_ctx, fd); - } - gpr_mu_unlock(&fd->mu); - - GRPC_FD_UNREF(fd, "poll"); -} - -static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; } - -/******************************************************************************* - * pollset_posix.c - */ - -GPR_TLS_DECL(g_current_thread_poller); -GPR_TLS_DECL(g_current_thread_worker); - -/** The alarm system needs to be able to wakeup 'some poller' sometimes - * (specifically when a new alarm needs to be triggered earlier than the next - * alarm 'epoch'). - * This wakeup_fd gives us something to alert on when such a case occurs. */ -grpc_wakeup_fd grpc_global_wakeup_fd; - -static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { - worker->prev->next = worker->next; - worker->next->prev = worker->prev; -} - -static int pollset_has_workers(grpc_pollset *p) { - return p->root_worker.next != &p->root_worker; -} - -static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) { - if (pollset_has_workers(p)) { - grpc_pollset_worker *w = p->root_worker.next; - remove_worker(p, w); - return w; - } else { - return NULL; - } -} - -static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) { - worker->next = &p->root_worker; - worker->prev = worker->next->prev; - worker->prev->next = worker->next->prev = worker; -} - -static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { - worker->prev = &p->root_worker; - worker->next = worker->prev->next; - worker->prev->next = worker->next->prev = worker; -} - -static void kick_append_error(grpc_error **composite, grpc_error *error) { - if (error == GRPC_ERROR_NONE) return; - if (*composite == GRPC_ERROR_NONE) { - *composite = GRPC_ERROR_CREATE("Kick Failure"); - } - *composite = grpc_error_add_child(*composite, error); -} - -static grpc_error *pollset_kick_ext(grpc_pollset *p, - grpc_pollset_worker *specific_worker, - uint32_t flags) { - GPR_TIMER_BEGIN("pollset_kick_ext", 0); - grpc_error *error = GRPC_ERROR_NONE; - - /* pollset->mu already held */ - if (specific_worker != NULL) { - if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { - GPR_TIMER_BEGIN("pollset_kick_ext.broadcast", 0); - GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); - for (specific_worker = p->root_worker.next; - specific_worker != &p->root_worker; - specific_worker = specific_worker->next) { - kick_append_error( - &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); - } - p->kicked_without_pollers = true; - GPR_TIMER_END("pollset_kick_ext.broadcast", 0); - } else if (gpr_tls_get(&g_current_thread_worker) != - (intptr_t)specific_worker) { - GPR_TIMER_MARK("different_thread_worker", 0); - if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { - specific_worker->reevaluate_polling_on_wakeup = true; - } - specific_worker->kicked_specifically = true; - kick_append_error(&error, - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); - } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) { - GPR_TIMER_MARK("kick_yoself", 0); - if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { - specific_worker->reevaluate_polling_on_wakeup = true; - } - specific_worker->kicked_specifically = true; - kick_append_error(&error, - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); - } - } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) { - GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); - GPR_TIMER_MARK("kick_anonymous", 0); - specific_worker = pop_front_worker(p); - if (specific_worker != NULL) { - if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) { - GPR_TIMER_MARK("kick_anonymous_not_self", 0); - push_back_worker(p, specific_worker); - specific_worker = pop_front_worker(p); - if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 && - gpr_tls_get(&g_current_thread_worker) == - (intptr_t)specific_worker) { - push_back_worker(p, specific_worker); - specific_worker = NULL; - } - } - if (specific_worker != NULL) { - GPR_TIMER_MARK("finally_kick", 0); - push_back_worker(p, specific_worker); - kick_append_error( - &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); - } - } else { - GPR_TIMER_MARK("kicked_no_pollers", 0); - p->kicked_without_pollers = true; - } - } - - GPR_TIMER_END("pollset_kick_ext", 0); - return error; -} - -static grpc_error *pollset_kick(grpc_pollset *p, - grpc_pollset_worker *specific_worker) { - return pollset_kick_ext(p, specific_worker, 0); -} - -/* global state management */ - -static grpc_error *pollset_global_init(void) { - gpr_tls_init(&g_current_thread_poller); - gpr_tls_init(&g_current_thread_worker); - return grpc_wakeup_fd_init(&grpc_global_wakeup_fd); -} - -static void pollset_global_shutdown(void) { - grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd); - gpr_tls_destroy(&g_current_thread_poller); - gpr_tls_destroy(&g_current_thread_worker); -} - -static grpc_error *kick_poller(void) { - return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); -} - -/* main interface */ - -static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null); - -static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { - gpr_mu_init(&pollset->mu); - *mu = &pollset->mu; - pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; - pollset->in_flight_cbs = 0; - pollset->shutting_down = 0; - pollset->called_shutdown = 0; - pollset->kicked_without_pollers = 0; - pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL; - pollset->local_wakeup_cache = NULL; - pollset->kicked_without_pollers = 0; - become_basic_pollset(pollset, NULL); -} - -static void pollset_destroy(grpc_pollset *pollset) { - GPR_ASSERT(pollset->in_flight_cbs == 0); - GPR_ASSERT(!pollset_has_workers(pollset)); - GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); - pollset->vtable->destroy(pollset); - while (pollset->local_wakeup_cache) { - grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next; - grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd); - gpr_free(pollset->local_wakeup_cache); - pollset->local_wakeup_cache = next; - } - gpr_mu_destroy(&pollset->mu); -} - -static void pollset_reset(grpc_pollset *pollset) { - GPR_ASSERT(pollset->shutting_down); - GPR_ASSERT(pollset->in_flight_cbs == 0); - GPR_ASSERT(!pollset_has_workers(pollset)); - GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); - pollset->vtable->destroy(pollset); - pollset->shutting_down = 0; - pollset->called_shutdown = 0; - pollset->kicked_without_pollers = 0; - become_basic_pollset(pollset, NULL); -} - -static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_fd *fd) { - gpr_mu_lock(&pollset->mu); - pollset->vtable->add_fd(exec_ctx, pollset, fd, 1); -/* the following (enabled only in debug) will reacquire and then release - our lock - meaning that if the unlocking flag passed to add_fd above is - not respected, the code will deadlock (in a way that we have a chance of - debugging) */ -#ifndef NDEBUG - gpr_mu_lock(&pollset->mu); - gpr_mu_unlock(&pollset->mu); -#endif -} - -static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { - GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs)); - pollset->vtable->finish_shutdown(pollset); - grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); -} - -static void work_combine_error(grpc_error **composite, grpc_error *error) { - if (error == GRPC_ERROR_NONE) return; - if (*composite == GRPC_ERROR_NONE) { - *composite = GRPC_ERROR_CREATE("pollset_work"); - } - *composite = grpc_error_add_child(*composite, error); -} - -static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { - grpc_pollset_worker worker; - *worker_hdl = &worker; - grpc_error *error = GRPC_ERROR_NONE; - - /* pollset->mu already held */ - int added_worker = 0; - int locked = 1; - int queued_work = 0; - int keep_polling = 0; - GPR_TIMER_BEGIN("pollset_work", 0); - /* this must happen before we (potentially) drop pollset->mu */ - worker.next = worker.prev = NULL; - worker.reevaluate_polling_on_wakeup = 0; - if (pollset->local_wakeup_cache != NULL) { - worker.wakeup_fd = pollset->local_wakeup_cache; - pollset->local_wakeup_cache = worker.wakeup_fd->next; - } else { - worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd)); - error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd); - if (error != GRPC_ERROR_NONE) { - return error; - } - } - worker.kicked_specifically = 0; - /* If there's work waiting for the pollset to be idle, and the - pollset is idle, then do that work */ - if (!pollset_has_workers(pollset) && - !grpc_closure_list_empty(pollset->idle_jobs)) { - GPR_TIMER_MARK("pollset_work.idle_jobs", 0); - grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); - goto done; - } - /* If we're shutting down then we don't execute any extended work */ - if (pollset->shutting_down) { - GPR_TIMER_MARK("pollset_work.shutting_down", 0); - goto done; - } - /* Give do_promote priority so we don't starve it out */ - if (pollset->in_flight_cbs) { - GPR_TIMER_MARK("pollset_work.in_flight_cbs", 0); - gpr_mu_unlock(&pollset->mu); - locked = 0; - goto done; - } - /* Start polling, and keep doing so while we're being asked to - re-evaluate our pollers (this allows poll() based pollers to - ensure they don't miss wakeups) */ - keep_polling = 1; - while (keep_polling) { - keep_polling = 0; - if (!pollset->kicked_without_pollers) { - if (!added_worker) { - push_front_worker(pollset, &worker); - added_worker = 1; - gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); - } - gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset); - GPR_TIMER_BEGIN("maybe_work_and_unlock", 0); - work_combine_error(&error, - pollset->vtable->maybe_work_and_unlock( - exec_ctx, pollset, &worker, deadline, now)); - GPR_TIMER_END("maybe_work_and_unlock", 0); - locked = 0; - gpr_tls_set(&g_current_thread_poller, 0); - } else { - GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0); - pollset->kicked_without_pollers = 0; - } - /* Finished execution - start cleaning up. - Note that we may arrive here from outside the enclosing while() loop. - In that case we won't loop though as we haven't added worker to the - worker list, which means nobody could ask us to re-evaluate polling). */ - done: - if (!locked) { - queued_work |= grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&pollset->mu); - locked = 1; - } - /* If we're forced to re-evaluate polling (via pollset_kick with - GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force - a loop */ - if (worker.reevaluate_polling_on_wakeup) { - worker.reevaluate_polling_on_wakeup = 0; - pollset->kicked_without_pollers = 0; - if (queued_work || worker.kicked_specifically) { - /* If there's queued work on the list, then set the deadline to be - immediate so we get back out of the polling loop quickly */ - deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); - } - keep_polling = 1; - } - } - if (added_worker) { - remove_worker(pollset, &worker); - gpr_tls_set(&g_current_thread_worker, 0); - } - /* release wakeup fd to the local pool */ - worker.wakeup_fd->next = pollset->local_wakeup_cache; - pollset->local_wakeup_cache = worker.wakeup_fd; - /* check shutdown conditions */ - if (pollset->shutting_down) { - if (pollset_has_workers(pollset)) { - pollset_kick(pollset, NULL); - } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) { - pollset->called_shutdown = 1; - gpr_mu_unlock(&pollset->mu); - finish_shutdown(exec_ctx, pollset); - grpc_exec_ctx_flush(exec_ctx); - /* Continuing to access pollset here is safe -- it is the caller's - * responsibility to not destroy when it has outstanding calls to - * pollset_work. - * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */ - gpr_mu_lock(&pollset->mu); - } else if (!grpc_closure_list_empty(pollset->idle_jobs)) { - grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); - gpr_mu_unlock(&pollset->mu); - grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&pollset->mu); - } - } - *worker_hdl = NULL; - GPR_TIMER_END("pollset_work", 0); - return error; -} - -static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_closure *closure) { - GPR_ASSERT(!pollset->shutting_down); - pollset->shutting_down = 1; - pollset->shutdown_done = closure; - pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); - if (!pollset_has_workers(pollset)) { - grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); - } - if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 && - !pollset_has_workers(pollset)) { - pollset->called_shutdown = 1; - finish_shutdown(exec_ctx, pollset); - } -} - -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now) { - gpr_timespec timeout; - static const int64_t max_spin_polling_us = 10; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { - return -1; - } - if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( - max_spin_polling_us, - GPR_TIMESPAN))) <= 0) { - return 0; - } - timeout = gpr_time_sub(deadline, now); - return gpr_time_to_millis(gpr_time_add( - timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); -} - -/* - * basic_pollset - a vtable that provides polling for zero or one file - * descriptor via poll() - */ - -typedef struct grpc_unary_promote_args { - const grpc_pollset_vtable *original_vtable; - grpc_pollset *pollset; - grpc_fd *fd; - grpc_closure promotion_closure; -} grpc_unary_promote_args; - -static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, - grpc_error *error) { - grpc_unary_promote_args *up_args = args; - const grpc_pollset_vtable *original_vtable = up_args->original_vtable; - grpc_pollset *pollset = up_args->pollset; - grpc_fd *fd = up_args->fd; - - /* - * This is quite tricky. There are a number of cases to keep in mind here: - * 1. fd may have been orphaned - * 2. The pollset may no longer be a unary poller (and we can't let case #1 - * leak to other pollset types!) - * 3. pollset's fd (which may have changed) may have been orphaned - * 4. The pollset may be shutting down. - */ - - gpr_mu_lock(&pollset->mu); - /* First we need to ensure that nobody is polling concurrently */ - GPR_ASSERT(!pollset_has_workers(pollset)); - - gpr_free(up_args); - /* At this point the pollset may no longer be a unary poller. In that case - * we should just call the right add function and be done. */ - /* TODO(klempner): If we're not careful this could cause infinite recursion. - * That's not a problem for now because empty_pollset has a trivial poller - * and we don't have any mechanism to unbecome multipoller. */ - pollset->in_flight_cbs--; - if (pollset->shutting_down) { - /* We don't care about this pollset anymore. */ - if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) { - pollset->called_shutdown = 1; - finish_shutdown(exec_ctx, pollset); - } - } else if (fd_is_orphaned(fd)) { - /* Don't try to add it to anything, we'll drop our ref on it below */ - } else if (pollset->vtable != original_vtable) { - pollset->vtable->add_fd(exec_ctx, pollset, fd, 0); - } else if (fd != pollset->data.ptr) { - grpc_fd *fds[2]; - fds[0] = pollset->data.ptr; - fds[1] = fd; - - if (fds[0] && !fd_is_orphaned(fds[0])) { - platform_become_multipoller(exec_ctx, pollset, fds, GPR_ARRAY_SIZE(fds)); - GRPC_FD_UNREF(fds[0], "basicpoll"); - } else { - /* old fd is orphaned and we haven't cleaned it up until now, so remain a - * unary poller */ - /* Note that it is possible that fds[1] is also orphaned at this point. - * That's okay, we'll correct it at the next add or poll. */ - if (fds[0]) GRPC_FD_UNREF(fds[0], "basicpoll"); - pollset->data.ptr = fd; - GRPC_FD_REF(fd, "basicpoll"); - } - } - - gpr_mu_unlock(&pollset->mu); - - /* Matching ref in basic_pollset_add_fd */ - GRPC_FD_UNREF(fd, "basicpoll_add"); -} - -static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_fd *fd, int and_unlock_pollset) { - grpc_unary_promote_args *up_args; - GPR_ASSERT(fd); - if (fd == pollset->data.ptr) goto exit; - - if (!pollset_has_workers(pollset)) { - /* Fast path -- no in flight cbs */ - /* TODO(klempner): Comment this out and fix any test failures or establish - * they are due to timing issues */ - grpc_fd *fds[2]; - fds[0] = pollset->data.ptr; - fds[1] = fd; - - if (fds[0] == NULL) { - pollset->data.ptr = fd; - GRPC_FD_REF(fd, "basicpoll"); - } else if (!fd_is_orphaned(fds[0])) { - platform_become_multipoller(exec_ctx, pollset, fds, GPR_ARRAY_SIZE(fds)); - GRPC_FD_UNREF(fds[0], "basicpoll"); - } else { - /* old fd is orphaned and we haven't cleaned it up until now, so remain a - * unary poller */ - GRPC_FD_UNREF(fds[0], "basicpoll"); - pollset->data.ptr = fd; - GRPC_FD_REF(fd, "basicpoll"); - } - goto exit; - } - - /* Now we need to promote. This needs to happen when we're not polling. Since - * this may be called from poll, the wait needs to happen asynchronously. */ - GRPC_FD_REF(fd, "basicpoll_add"); - pollset->in_flight_cbs++; - up_args = gpr_malloc(sizeof(*up_args)); - up_args->fd = fd; - up_args->original_vtable = pollset->vtable; - up_args->pollset = pollset; - up_args->promotion_closure.cb = basic_do_promote; - up_args->promotion_closure.cb_arg = up_args; - - grpc_closure_list_append(&pollset->idle_jobs, &up_args->promotion_closure, - GRPC_ERROR_NONE); - pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); - -exit: - if (and_unlock_pollset) { - gpr_mu_unlock(&pollset->mu); - } -} - -static grpc_error *basic_pollset_maybe_work_and_unlock( - grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, - gpr_timespec deadline, gpr_timespec now) { -#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) -#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) - - struct pollfd pfd[3]; - grpc_fd *fd; - grpc_fd_watcher fd_watcher; - int timeout; - int r; - nfds_t nfds; - grpc_error *error = GRPC_ERROR_NONE; - - fd = pollset->data.ptr; - if (fd && fd_is_orphaned(fd)) { - GRPC_FD_UNREF(fd, "basicpoll"); - fd = pollset->data.ptr = NULL; - } - timeout = poll_deadline_to_millis_timeout(deadline, now); - pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); - pfd[0].events = POLLIN; - pfd[0].revents = 0; - pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd); - pfd[1].events = POLLIN; - pfd[1].revents = 0; - nfds = 2; - if (fd) { - pfd[2].fd = fd->fd; - pfd[2].revents = 0; - GRPC_FD_REF(fd, "basicpoll_begin"); - gpr_mu_unlock(&pollset->mu); - pfd[2].events = - (short)fd_begin_poll(fd, pollset, worker, POLLIN, POLLOUT, &fd_watcher); - if (pfd[2].events != 0) { - nfds++; - } - } else { - gpr_mu_unlock(&pollset->mu); - } - - /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid - even going into the blocking annotation if possible */ - /* poll fd count (argument 2) is shortened by one if we have no events - to poll on - such that it only includes the kicker */ - GPR_TIMER_BEGIN("poll", 0); - GRPC_SCHEDULING_START_BLOCKING_REGION; - r = grpc_poll_function(pfd, nfds, timeout); - GRPC_SCHEDULING_END_BLOCKING_REGION; - GPR_TIMER_END("poll", 0); - - if (r < 0) { - if (errno != EINTR) { - work_combine_error(&error, GRPC_OS_ERROR(errno, "poll")); - } - if (fd) { - fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL); - } - } else if (r == 0) { - if (fd) { - fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL); - } - } else { - if (pfd[0].revents & POLLIN_CHECK) { - work_combine_error(&error, - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd)); - } - if (pfd[1].revents & POLLIN_CHECK) { - work_combine_error(&error, - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd)); - } - if (nfds > 2) { - fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK, - pfd[2].revents & POLLOUT_CHECK, pollset); - } else if (fd) { - fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL); - } - } - - if (fd) { - GRPC_FD_UNREF(fd, "basicpoll_begin"); - } - - return error; -} - -static void basic_pollset_destroy(grpc_pollset *pollset) { - if (pollset->data.ptr != NULL) { - GRPC_FD_UNREF(pollset->data.ptr, "basicpoll"); - pollset->data.ptr = NULL; - } -} - -static const grpc_pollset_vtable basic_pollset = { - basic_pollset_add_fd, basic_pollset_maybe_work_and_unlock, - basic_pollset_destroy, basic_pollset_destroy}; - -static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) { - pollset->vtable = &basic_pollset; - pollset->data.ptr = fd_or_null; - if (fd_or_null != NULL) { - GRPC_FD_REF(fd_or_null, "basicpoll"); - } -} - -/******************************************************************************* - * pollset_multipoller_with_poll_posix.c - */ - -#ifndef GRPC_LINUX_MULTIPOLL_WITH_EPOLL - -typedef struct { - /* all polled fds */ - size_t fd_count; - size_t fd_capacity; - grpc_fd **fds; - /* fds that have been removed from the pollset explicitly */ - size_t del_count; - size_t del_capacity; - grpc_fd **dels; -} poll_hdr; - -static void multipoll_with_poll_pollset_add_fd(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - grpc_fd *fd, - int and_unlock_pollset) { - size_t i; - poll_hdr *h = pollset->data.ptr; - /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */ - for (i = 0; i < h->fd_count; i++) { - if (h->fds[i] == fd) goto exit; - } - if (h->fd_count == h->fd_capacity) { - h->fd_capacity = GPR_MAX(h->fd_capacity + 8, h->fd_count * 3 / 2); - h->fds = gpr_realloc(h->fds, sizeof(grpc_fd *) * h->fd_capacity); - } - h->fds[h->fd_count++] = fd; - GRPC_FD_REF(fd, "multipoller"); -exit: - if (and_unlock_pollset) { - gpr_mu_unlock(&pollset->mu); - } -} - -static grpc_error *multipoll_with_poll_pollset_maybe_work_and_unlock( - grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, - gpr_timespec deadline, gpr_timespec now) { -#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) -#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) - - int timeout; - int r; - size_t i, j, fd_count; - nfds_t pfd_count; - poll_hdr *h; - /* TODO(ctiller): inline some elements to avoid an allocation */ - grpc_fd_watcher *watchers; - struct pollfd *pfds; - grpc_error *error = GRPC_ERROR_NONE; - - h = pollset->data.ptr; - timeout = poll_deadline_to_millis_timeout(deadline, now); - /* TODO(ctiller): perform just one malloc here if we exceed the inline case */ - pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 2)); - watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 2)); - fd_count = 0; - pfd_count = 2; - pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); - pfds[0].events = POLLIN; - pfds[0].revents = 0; - pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd); - pfds[1].events = POLLIN; - pfds[1].revents = 0; - for (i = 0; i < h->fd_count; i++) { - int remove = fd_is_orphaned(h->fds[i]); - for (j = 0; !remove && j < h->del_count; j++) { - if (h->fds[i] == h->dels[j]) remove = 1; - } - if (remove) { - GRPC_FD_UNREF(h->fds[i], "multipoller"); - } else { - h->fds[fd_count++] = h->fds[i]; - watchers[pfd_count].fd = h->fds[i]; - GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start"); - pfds[pfd_count].fd = h->fds[i]->fd; - pfds[pfd_count].revents = 0; - pfd_count++; - } - } - for (j = 0; j < h->del_count; j++) { - GRPC_FD_UNREF(h->dels[j], "multipoller_del"); - } - h->del_count = 0; - h->fd_count = fd_count; - gpr_mu_unlock(&pollset->mu); - - for (i = 2; i < pfd_count; i++) { - grpc_fd *fd = watchers[i].fd; - pfds[i].events = (short)fd_begin_poll(fd, pollset, worker, POLLIN, POLLOUT, - &watchers[i]); - GRPC_FD_UNREF(fd, "multipoller_start"); - } - - /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid - even going into the blocking annotation if possible */ - GRPC_SCHEDULING_START_BLOCKING_REGION; - r = grpc_poll_function(pfds, pfd_count, timeout); - GRPC_SCHEDULING_END_BLOCKING_REGION; - - if (r < 0) { - if (errno != EINTR) { - work_combine_error(&error, GRPC_OS_ERROR(errno, "poll")); - } - for (i = 2; i < pfd_count; i++) { - fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL); - } - } else if (r == 0) { - for (i = 2; i < pfd_count; i++) { - fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL); - } - } else { - if (pfds[0].revents & POLLIN_CHECK) { - work_combine_error(&error, - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd)); - } - if (pfds[1].revents & POLLIN_CHECK) { - work_combine_error(&error, - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd)); - } - for (i = 2; i < pfd_count; i++) { - if (watchers[i].fd == NULL) { - fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL); - continue; - } - fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK, - pfds[i].revents & POLLOUT_CHECK, pollset); - } - } - - gpr_free(pfds); - gpr_free(watchers); - - return error; -} - -static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) { - size_t i; - poll_hdr *h = pollset->data.ptr; - for (i = 0; i < h->fd_count; i++) { - GRPC_FD_UNREF(h->fds[i], "multipoller"); - } - for (i = 0; i < h->del_count; i++) { - GRPC_FD_UNREF(h->dels[i], "multipoller_del"); - } - h->fd_count = 0; - h->del_count = 0; -} - -static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) { - poll_hdr *h = pollset->data.ptr; - multipoll_with_poll_pollset_finish_shutdown(pollset); - gpr_free(h->fds); - gpr_free(h->dels); - gpr_free(h); -} - -static const grpc_pollset_vtable multipoll_with_poll_pollset = { - multipoll_with_poll_pollset_add_fd, - multipoll_with_poll_pollset_maybe_work_and_unlock, - multipoll_with_poll_pollset_finish_shutdown, - multipoll_with_poll_pollset_destroy}; - -static void poll_become_multipoller(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, grpc_fd **fds, - size_t nfds) { - size_t i; - poll_hdr *h = gpr_malloc(sizeof(poll_hdr)); - pollset->vtable = &multipoll_with_poll_pollset; - pollset->data.ptr = h; - h->fd_count = nfds; - h->fd_capacity = nfds; - h->fds = gpr_malloc(nfds * sizeof(grpc_fd *)); - h->del_count = 0; - h->del_capacity = 0; - h->dels = NULL; - for (i = 0; i < nfds; i++) { - h->fds[i] = fds[i]; - GRPC_FD_REF(fds[i], "multipoller"); - } -} - -#endif /* !GRPC_LINUX_MULTIPOLL_WITH_EPOLL */ - -/******************************************************************************* - * pollset_multipoller_with_epoll_posix.c - */ - -#ifdef GRPC_LINUX_MULTIPOLL_WITH_EPOLL - -#include <errno.h> -#include <poll.h> -#include <string.h> -#include <sys/epoll.h> -#include <unistd.h> - -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/useful.h> - -#include "src/core/lib/iomgr/ev_posix.h" -#include "src/core/lib/profiling/timers.h" -#include "src/core/lib/support/block_annotate.h" - -static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st, - grpc_pollset *read_notifier_pollset) { - /* only one set_ready can be active at once (but there may be a racing - notify_on) */ - gpr_mu_lock(&fd->mu); - set_ready_locked(exec_ctx, fd, st); - - /* A non-NULL read_notifier_pollset means that the fd is readable. */ - if (read_notifier_pollset != NULL) { - /* Note: Since the fd might be a part of multiple pollsets, this might be - * called multiple times (for each time the fd becomes readable) and it is - * okay to set the fd's read-notifier pollset to anyone of these pollsets */ - set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset); - } - - gpr_mu_unlock(&fd->mu); -} - -static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, - grpc_pollset *notifier_pollset) { - set_ready(exec_ctx, fd, &fd->read_closure, notifier_pollset); -} - -static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - set_ready(exec_ctx, fd, &fd->write_closure, NULL); -} - -struct epoll_fd_list { - int *epoll_fds; - size_t count; - size_t capacity; -}; - -static struct epoll_fd_list epoll_fd_global_list; -static gpr_once init_epoll_fd_list_mu = GPR_ONCE_INIT; -static gpr_mu epoll_fd_list_mu; - -static void init_mu(void) { gpr_mu_init(&epoll_fd_list_mu); } - -static void add_epoll_fd_to_global_list(int epoll_fd) { - gpr_once_init(&init_epoll_fd_list_mu, init_mu); - - gpr_mu_lock(&epoll_fd_list_mu); - if (epoll_fd_global_list.count == epoll_fd_global_list.capacity) { - epoll_fd_global_list.capacity = - GPR_MAX((size_t)8, epoll_fd_global_list.capacity * 2); - epoll_fd_global_list.epoll_fds = - gpr_realloc(epoll_fd_global_list.epoll_fds, - epoll_fd_global_list.capacity * sizeof(int)); - } - epoll_fd_global_list.epoll_fds[epoll_fd_global_list.count++] = epoll_fd; - gpr_mu_unlock(&epoll_fd_list_mu); -} - -static void remove_epoll_fd_from_global_list(int epoll_fd) { - gpr_mu_lock(&epoll_fd_list_mu); - GPR_ASSERT(epoll_fd_global_list.count > 0); - for (size_t i = 0; i < epoll_fd_global_list.count; i++) { - if (epoll_fd == epoll_fd_global_list.epoll_fds[i]) { - epoll_fd_global_list.epoll_fds[i] = - epoll_fd_global_list.epoll_fds[--(epoll_fd_global_list.count)]; - break; - } - } - gpr_mu_unlock(&epoll_fd_list_mu); -} - -static void remove_fd_from_all_epoll_sets(int fd) { - int err; - gpr_once_init(&init_epoll_fd_list_mu, init_mu); - gpr_mu_lock(&epoll_fd_list_mu); - if (epoll_fd_global_list.count == 0) { - gpr_mu_unlock(&epoll_fd_list_mu); - return; - } - for (size_t i = 0; i < epoll_fd_global_list.count; i++) { - err = epoll_ctl(epoll_fd_global_list.epoll_fds[i], EPOLL_CTL_DEL, fd, NULL); - if (err < 0 && errno != ENOENT) { - gpr_log(GPR_ERROR, "epoll_ctl del for %d failed: %s", fd, - strerror(errno)); - } - } - gpr_mu_unlock(&epoll_fd_list_mu); -} - -typedef struct { - grpc_pollset *pollset; - grpc_fd *fd; - grpc_closure closure; -} delayed_add; - -typedef struct { int epoll_fd; } epoll_hdr; - -static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_fd *fd) { - epoll_hdr *h = pollset->data.ptr; - struct epoll_event ev; - int err; - grpc_fd_watcher watcher; - - /* We pretend to be polling whilst adding an fd to keep the fd from being - closed during the add. This may result in a spurious wakeup being assigned - to this pollset whilst adding, but that should be benign. */ - GPR_ASSERT(fd_begin_poll(fd, pollset, NULL, 0, 0, &watcher) == 0); - if (watcher.fd != NULL) { - ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET); - ev.data.ptr = fd; - err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); - if (err < 0) { - /* FDs may be added to a pollset multiple times, so EEXIST is normal. */ - if (errno != EEXIST) { - gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd, - strerror(errno)); - } - } - } - fd_end_poll(exec_ctx, &watcher, 0, 0, NULL); -} - -static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - delayed_add *da = arg; - - if (!fd_is_orphaned(da->fd)) { - finally_add_fd(exec_ctx, da->pollset, da->fd); - } - - gpr_mu_lock(&da->pollset->mu); - da->pollset->in_flight_cbs--; - if (da->pollset->shutting_down) { - /* We don't care about this pollset anymore. */ - if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) { - da->pollset->called_shutdown = 1; - grpc_exec_ctx_sched(exec_ctx, da->pollset->shutdown_done, GRPC_ERROR_NONE, - NULL); - } - } - gpr_mu_unlock(&da->pollset->mu); - - GRPC_FD_UNREF(da->fd, "delayed_add"); - - gpr_free(da); -} - -static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - grpc_fd *fd, - int and_unlock_pollset) { - if (and_unlock_pollset) { - gpr_mu_unlock(&pollset->mu); - finally_add_fd(exec_ctx, pollset, fd); - } else { - delayed_add *da = gpr_malloc(sizeof(*da)); - da->pollset = pollset; - da->fd = fd; - GRPC_FD_REF(fd, "delayed_add"); - grpc_closure_init(&da->closure, perform_delayed_add, da); - pollset->in_flight_cbs++; - grpc_exec_ctx_sched(exec_ctx, &da->closure, GRPC_ERROR_NONE, NULL); - } -} - -/* TODO(klempner): We probably want to turn this down a bit */ -#define GRPC_EPOLL_MAX_EVENTS 1000 - -static grpc_error *multipoll_with_epoll_pollset_maybe_work_and_unlock( - grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, - gpr_timespec deadline, gpr_timespec now) { - struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; - int ep_rv; - int poll_rv; - epoll_hdr *h = pollset->data.ptr; - int timeout_ms; - struct pollfd pfds[2]; - grpc_error *error = GRPC_ERROR_NONE; - - /* If you want to ignore epoll's ability to sanely handle parallel pollers, - * for a more apples-to-apples performance comparison with poll, add a - * if (pollset->counter != 0) { return 0; } - * here. - */ - - gpr_mu_unlock(&pollset->mu); - - timeout_ms = poll_deadline_to_millis_timeout(deadline, now); - - pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd); - pfds[0].events = POLLIN; - pfds[0].revents = 0; - pfds[1].fd = h->epoll_fd; - pfds[1].events = POLLIN; - pfds[1].revents = 0; - - /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid - even going into the blocking annotation if possible */ - GPR_TIMER_BEGIN("poll", 0); - GRPC_SCHEDULING_START_BLOCKING_REGION; - poll_rv = grpc_poll_function(pfds, 2, timeout_ms); - GRPC_SCHEDULING_END_BLOCKING_REGION; - GPR_TIMER_END("poll", 0); - - if (poll_rv < 0) { - if (errno != EINTR) { - work_combine_error(&error, GRPC_OS_ERROR(errno, "poll")); - } - } else if (poll_rv == 0) { - /* do nothing */ - } else { - if (pfds[0].revents) { - work_combine_error(&error, - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd)); - } - if (pfds[1].revents) { - do { - /* The following epoll_wait never blocks; it has a timeout of 0 */ - ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); - if (ep_rv < 0) { - if (errno != EINTR) { - work_combine_error(&error, GRPC_OS_ERROR(errno, "epoll_wait")); - } - } else { - int i; - for (i = 0; i < ep_rv; ++i) { - grpc_fd *fd = ep_ev[i].data.ptr; - /* TODO(klempner): We might want to consider making err and pri - * separate events */ - int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); - int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); - int write_ev = ep_ev[i].events & EPOLLOUT; - if (fd == NULL) { - work_combine_error(&error, grpc_wakeup_fd_consume_wakeup( - &grpc_global_wakeup_fd)); - } else { - if (read_ev || cancel) { - fd_become_readable(exec_ctx, fd, pollset); - } - if (write_ev || cancel) { - fd_become_writable(exec_ctx, fd); - } - } - } - } - } while (ep_rv == GRPC_EPOLL_MAX_EVENTS); - } - } - return error; -} - -static void multipoll_with_epoll_pollset_finish_shutdown( - grpc_pollset *pollset) {} - -static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) { - epoll_hdr *h = pollset->data.ptr; - close(h->epoll_fd); - remove_epoll_fd_from_global_list(h->epoll_fd); - gpr_free(h); -} - -static const grpc_pollset_vtable multipoll_with_epoll_pollset = { - multipoll_with_epoll_pollset_add_fd, - multipoll_with_epoll_pollset_maybe_work_and_unlock, - multipoll_with_epoll_pollset_finish_shutdown, - multipoll_with_epoll_pollset_destroy}; - -static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, grpc_fd **fds, - size_t nfds) { - size_t i; - epoll_hdr *h = gpr_malloc(sizeof(epoll_hdr)); - struct epoll_event ev; - int err; - - pollset->vtable = &multipoll_with_epoll_pollset; - pollset->data.ptr = h; - h->epoll_fd = epoll_create1(EPOLL_CLOEXEC); - if (h->epoll_fd < 0) { - /* TODO(klempner): Fall back to poll here, especially on ENOSYS */ - gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno)); - abort(); - } - add_epoll_fd_to_global_list(h->epoll_fd); - - ev.events = (uint32_t)(EPOLLIN | EPOLLET); - ev.data.ptr = NULL; - err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, - GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev); - if (err < 0) { - gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", - GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), - strerror(errno)); - } - - for (i = 0; i < nfds; i++) { - multipoll_with_epoll_pollset_add_fd(exec_ctx, pollset, fds[i], 0); - } -} - -#else /* GRPC_LINUX_MULTIPOLL_WITH_EPOLL */ - -static void remove_fd_from_all_epoll_sets(int fd) {} - -#endif /* GRPC_LINUX_MULTIPOLL_WITH_EPOLL */ - -/******************************************************************************* - * pollset_set_posix.c - */ - -static grpc_pollset_set *pollset_set_create(void) { - grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set)); - memset(pollset_set, 0, sizeof(*pollset_set)); - gpr_mu_init(&pollset_set->mu); - return pollset_set; -} - -static void pollset_set_destroy(grpc_pollset_set *pollset_set) { - size_t i; - gpr_mu_destroy(&pollset_set->mu); - for (i = 0; i < pollset_set->fd_count; i++) { - GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); - } - gpr_free(pollset_set->pollsets); - gpr_free(pollset_set->pollset_sets); - gpr_free(pollset_set->fds); - gpr_free(pollset_set); -} - -static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pollset_set, - grpc_pollset *pollset) { - size_t i, j; - gpr_mu_lock(&pollset_set->mu); - if (pollset_set->pollset_count == pollset_set->pollset_capacity) { - pollset_set->pollset_capacity = - GPR_MAX(8, 2 * pollset_set->pollset_capacity); - pollset_set->pollsets = - gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity * - sizeof(*pollset_set->pollsets)); - } - pollset_set->pollsets[pollset_set->pollset_count++] = pollset; - for (i = 0, j = 0; i < pollset_set->fd_count; i++) { - if (fd_is_orphaned(pollset_set->fds[i])) { - GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); - } else { - pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]); - pollset_set->fds[j++] = pollset_set->fds[i]; - } - } - pollset_set->fd_count = j; - gpr_mu_unlock(&pollset_set->mu); -} - -static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pollset_set, - grpc_pollset *pollset) { - size_t i; - gpr_mu_lock(&pollset_set->mu); - for (i = 0; i < pollset_set->pollset_count; i++) { - if (pollset_set->pollsets[i] == pollset) { - pollset_set->pollset_count--; - GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i], - pollset_set->pollsets[pollset_set->pollset_count]); - break; - } - } - gpr_mu_unlock(&pollset_set->mu); -} - -static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item) { - size_t i, j; - gpr_mu_lock(&bag->mu); - if (bag->pollset_set_count == bag->pollset_set_capacity) { - bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity); - bag->pollset_sets = - gpr_realloc(bag->pollset_sets, - bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); - } - bag->pollset_sets[bag->pollset_set_count++] = item; - for (i = 0, j = 0; i < bag->fd_count; i++) { - if (fd_is_orphaned(bag->fds[i])) { - GRPC_FD_UNREF(bag->fds[i], "pollset_set"); - } else { - pollset_set_add_fd(exec_ctx, item, bag->fds[i]); - bag->fds[j++] = bag->fds[i]; - } - } - bag->fd_count = j; - gpr_mu_unlock(&bag->mu); -} - -static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item) { - size_t i; - gpr_mu_lock(&bag->mu); - for (i = 0; i < bag->pollset_set_count; i++) { - if (bag->pollset_sets[i] == item) { - bag->pollset_set_count--; - GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i], - bag->pollset_sets[bag->pollset_set_count]); - break; - } - } - gpr_mu_unlock(&bag->mu); -} - -static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pollset_set, grpc_fd *fd) { - size_t i; - gpr_mu_lock(&pollset_set->mu); - if (pollset_set->fd_count == pollset_set->fd_capacity) { - pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity); - pollset_set->fds = gpr_realloc( - pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds)); - } - GRPC_FD_REF(fd, "pollset_set"); - pollset_set->fds[pollset_set->fd_count++] = fd; - for (i = 0; i < pollset_set->pollset_count; i++) { - pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd); - } - for (i = 0; i < pollset_set->pollset_set_count; i++) { - pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd); - } - gpr_mu_unlock(&pollset_set->mu); -} - -static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pollset_set, grpc_fd *fd) { - size_t i; - gpr_mu_lock(&pollset_set->mu); - for (i = 0; i < pollset_set->fd_count; i++) { - if (pollset_set->fds[i] == fd) { - pollset_set->fd_count--; - GPR_SWAP(grpc_fd *, pollset_set->fds[i], - pollset_set->fds[pollset_set->fd_count]); - GRPC_FD_UNREF(fd, "pollset_set"); - break; - } - } - for (i = 0; i < pollset_set->pollset_set_count; i++) { - pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd); - } - gpr_mu_unlock(&pollset_set->mu); -} - -/******************************************************************************* - * workqueue stubs - */ - -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue, - const char *file, int line, - const char *reason) { - return workqueue; -} -static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason) {} -#else -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) { - return workqueue; -} -static void workqueue_unref(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue) {} -#endif - -static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue, grpc_closure *closure, - grpc_error *error) { - grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); -} - -/******************************************************************************* - * event engine binding - */ - -static void shutdown_engine(void) { - fd_global_shutdown(); - pollset_global_shutdown(); -} - -static const grpc_event_engine_vtable vtable = { - .pollset_size = sizeof(grpc_pollset), - - .fd_create = fd_create, - .fd_wrapped_fd = fd_wrapped_fd, - .fd_orphan = fd_orphan, - .fd_shutdown = fd_shutdown, - .fd_is_shutdown = fd_is_shutdown, - .fd_notify_on_read = fd_notify_on_read, - .fd_notify_on_write = fd_notify_on_write, - .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, - .fd_get_workqueue = fd_get_workqueue, - - .pollset_init = pollset_init, - .pollset_shutdown = pollset_shutdown, - .pollset_reset = pollset_reset, - .pollset_destroy = pollset_destroy, - .pollset_work = pollset_work, - .pollset_kick = pollset_kick, - .pollset_add_fd = pollset_add_fd, - - .pollset_set_create = pollset_set_create, - .pollset_set_destroy = pollset_set_destroy, - .pollset_set_add_pollset = pollset_set_add_pollset, - .pollset_set_del_pollset = pollset_set_del_pollset, - .pollset_set_add_pollset_set = pollset_set_add_pollset_set, - .pollset_set_del_pollset_set = pollset_set_del_pollset_set, - .pollset_set_add_fd = pollset_set_add_fd, - .pollset_set_del_fd = pollset_set_del_fd, - - .kick_poller = kick_poller, - - .workqueue_ref = workqueue_ref, - .workqueue_unref = workqueue_unref, - .workqueue_enqueue = workqueue_enqueue, - - .shutdown_engine = shutdown_engine, -}; - -const grpc_event_engine_vtable *grpc_init_poll_and_epoll_posix(void) { -#ifdef GRPC_LINUX_MULTIPOLL_WITH_EPOLL - platform_become_multipoller = epoll_become_multipoller; -#else - platform_become_multipoller = poll_become_multipoller; -#endif - fd_global_init(); - pollset_global_init(); - return &vtable; -} - -#endif diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.h b/src/core/lib/iomgr/ev_poll_and_epoll_posix.h deleted file mode 100644 index 06d6dbf29d..0000000000 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.h +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_CORE_LIB_IOMGR_EV_POLL_AND_EPOLL_POSIX_H -#define GRPC_CORE_LIB_IOMGR_EV_POLL_AND_EPOLL_POSIX_H - -#include "src/core/lib/iomgr/ev_posix.h" - -const grpc_event_engine_vtable *grpc_init_poll_and_epoll_posix(void); - -#endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_AND_EPOLL_POSIX_H */ diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index e1d620cfff..21b28e5554 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -120,6 +120,8 @@ struct grpc_fd { grpc_pollset *read_notifier_pollset; }; +static grpc_wakeup_fd global_wakeup_fd; + /* Begin polling on an fd. Registers that the given pollset is interested in this fd - so that if read or writability interest changes, the pollset can be kicked to pick up that @@ -769,17 +771,17 @@ static grpc_error *pollset_kick(grpc_pollset *p, static grpc_error *pollset_global_init(void) { gpr_tls_init(&g_current_thread_poller); gpr_tls_init(&g_current_thread_worker); - return grpc_wakeup_fd_init(&grpc_global_wakeup_fd); + return grpc_wakeup_fd_init(&global_wakeup_fd); } static void pollset_global_shutdown(void) { - grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd); + grpc_wakeup_fd_destroy(&global_wakeup_fd); gpr_tls_destroy(&g_current_thread_poller); gpr_tls_destroy(&g_current_thread_worker); } static grpc_error *kick_poller(void) { - return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); + return grpc_wakeup_fd_wakeup(&global_wakeup_fd); } /* main interface */ @@ -947,7 +949,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, fd_count = 0; pfd_count = 2; - pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); + pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&global_wakeup_fd); pfds[0].events = POLLIN; pfds[0].revents = 0; pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd); @@ -1001,8 +1003,8 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } else { if (pfds[0].revents & POLLIN_CHECK) { - work_combine_error( - &error, grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd)); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd)); } if (pfds[1].revents & POLLIN_CHECK) { work_combine_error( @@ -1343,6 +1345,7 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { int res, idx; gpr_cv *pollcv; cv_node *cvn, *prev; + int skip_poll = 0; nfds_t nsockfds = 0; gpr_thd_id t_id; gpr_thd_options opt; @@ -1358,17 +1361,17 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { cvn->cv = pollcv; cvn->next = g_cvfds.cvfds[idx].cvs; g_cvfds.cvfds[idx].cvs = cvn; - // We should return immediately if there are pending events, - // but we still need to call poll() to check for socket events + // Don't bother polling if a wakeup fd is ready if (g_cvfds.cvfds[idx].is_set) { - timeout = 0; + skip_poll = 1; } } else if (fds[i].fd >= 0) { nsockfds++; } } - if (nsockfds > 0) { + res = 0; + if (!skip_poll && nsockfds > 0) { pargs = gpr_malloc(sizeof(struct poll_args)); // Both the main thread and calling thread get a reference gpr_ref_init(&pargs->refcount, 2); @@ -1398,16 +1401,14 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { res = pargs->retval; errno = pargs->err; } else { - res = 0; errno = 0; gpr_atm_no_barrier_store(&pargs->status, CANCELLED); } - } else { + } else if (!skip_poll) { gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME); deadline = gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN)); gpr_cv_wait(pollcv, &g_cvfds.mu, deadline); - res = 0; } idx = 0; @@ -1431,7 +1432,7 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { fds[i].revents = POLLIN; if (res >= 0) res++; } - } else if (fds[i].fd >= 0 && + } else if (!skip_poll && fds[i].fd >= 0 && gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) { fds[i].revents = pargs->fds[idx].revents; idx++; diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index ef36ba89b2..ab139895fd 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -45,7 +45,6 @@ #include <grpc/support/useful.h> #include "src/core/lib/iomgr/ev_epoll_linux.h" -#include "src/core/lib/iomgr/ev_poll_and_epoll_posix.h" #include "src/core/lib/iomgr/ev_poll_posix.h" #include "src/core/lib/support/env.h" @@ -67,7 +66,6 @@ static const event_engine_factory g_factories[] = { {"epoll", grpc_init_epoll_linux}, {"poll", grpc_init_poll_posix}, {"poll-cv", grpc_init_poll_cv_posix}, - {"legacy", grpc_init_poll_and_epoll_posix}, }; static void add(const char *beg, const char *end, char ***ss, size_t *ns) { diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 2fdef06838..cb5832539d 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -183,6 +183,5 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, /* override to allow tests to hook poll() usage */ typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int); extern grpc_poll_function_type grpc_poll_function; -extern grpc_wakeup_fd grpc_global_wakeup_fd; #endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */ diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c index 4fd83e0b22..3470b5ac81 100644 --- a/src/core/lib/iomgr/iomgr.c +++ b/src/core/lib/iomgr/iomgr.c @@ -108,6 +108,7 @@ void grpc_iomgr_shutdown(void) { NULL)) { gpr_mu_unlock(&g_mu); grpc_exec_ctx_flush(&exec_ctx); + grpc_iomgr_platform_flush(); gpr_mu_lock(&g_mu); continue; } diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index 051a30baa3..379bf9bd23 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -104,6 +104,9 @@ struct grpc_resource_user { /* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer */ grpc_closure *reclaimers[2]; + /* Reclaimers just posted: once we're in the combiner lock, we'll move them + to the array above */ + grpc_closure *new_reclaimers[2]; /* Trampoline closures to finish reclamation and re-enter the quota combiner lock */ grpc_closure post_reclaimer_closure[2]; @@ -418,9 +421,25 @@ static void ru_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *ru, rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL); } +static bool ru_post_reclaimer(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user, + bool destructive) { + grpc_closure *closure = resource_user->new_reclaimers[destructive]; + GPR_ASSERT(closure != NULL); + resource_user->new_reclaimers[destructive] = NULL; + GPR_ASSERT(resource_user->reclaimers[destructive] == NULL); + if (gpr_atm_acq_load(&resource_user->shutdown) > 0) { + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL); + return false; + } + resource_user->reclaimers[destructive] = closure; + return true; +} + static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { grpc_resource_user *resource_user = ru; + if (!ru_post_reclaimer(exec_ctx, resource_user, false)) return; if (!rulist_empty(resource_user->resource_quota, GRPC_RULIST_AWAITING_ALLOCATION) && rulist_empty(resource_user->resource_quota, @@ -435,6 +454,7 @@ static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { grpc_resource_user *resource_user = ru; + if (!ru_post_reclaimer(exec_ctx, resource_user, true)) return; if (!rulist_empty(resource_user->resource_quota, GRPC_RULIST_AWAITING_ALLOCATION) && rulist_empty(resource_user->resource_quota, @@ -649,6 +669,8 @@ grpc_resource_user *grpc_resource_user_create( resource_user->added_to_free_pool = false; resource_user->reclaimers[0] = NULL; resource_user->reclaimers[1] = NULL; + resource_user->new_reclaimers[0] = NULL; + resource_user->new_reclaimers[1] = NULL; for (int i = 0; i < GRPC_RULIST_COUNT; i++) { resource_user->links[i].next = resource_user->links[i].prev = NULL; } @@ -748,12 +770,8 @@ void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx, grpc_resource_user *resource_user, bool destructive, grpc_closure *closure) { - GPR_ASSERT(resource_user->reclaimers[destructive] == NULL); - if (gpr_atm_acq_load(&resource_user->shutdown) > 0) { - grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL); - return; - } - resource_user->reclaimers[destructive] = closure; + GPR_ASSERT(resource_user->new_reclaimers[destructive] == NULL); + resource_user->new_reclaimers[destructive] = closure; grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, &resource_user->post_reclaimer_closure[destructive], GRPC_ERROR_NONE, false); diff --git a/src/core/lib/iomgr/socket_mutator.c b/src/core/lib/iomgr/socket_mutator.c new file mode 100644 index 0000000000..8b1efb6bab --- /dev/null +++ b/src/core/lib/iomgr/socket_mutator.c @@ -0,0 +1,98 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/socket_mutator.h" + +#include <grpc/impl/codegen/grpc_types.h> +#include <grpc/support/sync.h> +#include <grpc/support/useful.h> + +void grpc_socket_mutator_init(grpc_socket_mutator *mutator, + const grpc_socket_mutator_vtable *vtable) { + mutator->vtable = vtable; + gpr_ref_init(&mutator->refcount, 1); +} + +grpc_socket_mutator *grpc_socket_mutator_ref(grpc_socket_mutator *mutator) { + gpr_ref(&mutator->refcount); + return mutator; +} + +bool grpc_socket_mutator_mutate_fd(grpc_socket_mutator *mutator, int fd) { + return mutator->vtable->mutate_fd(fd, mutator); +} + +int grpc_socket_mutator_compare(grpc_socket_mutator *a, + grpc_socket_mutator *b) { + int c = GPR_ICMP(a, b); + if (c != 0) { + grpc_socket_mutator *sma = a; + grpc_socket_mutator *smb = b; + c = GPR_ICMP(sma->vtable, smb->vtable); + if (c == 0) { + c = sma->vtable->compare(sma, smb); + } + } + return c; +} + +void grpc_socket_mutator_unref(grpc_socket_mutator *mutator) { + if (gpr_unref(&mutator->refcount)) { + mutator->vtable->destory(mutator); + } +} + +static void *socket_mutator_arg_copy(void *p) { + return grpc_socket_mutator_ref(p); +} + +static void socket_mutator_arg_destroy(void *p) { + grpc_socket_mutator_unref(p); +} + +static int socket_mutator_cmp(void *a, void *b) { + return grpc_socket_mutator_compare((grpc_socket_mutator *)a, + (grpc_socket_mutator *)b); +} + +static const grpc_arg_pointer_vtable socket_mutator_arg_vtable = { + socket_mutator_arg_copy, socket_mutator_arg_destroy, socket_mutator_cmp}; + +grpc_arg grpc_socket_mutator_to_arg(grpc_socket_mutator *mutator) { + grpc_arg arg; + arg.type = GRPC_ARG_POINTER; + arg.key = GRPC_ARG_SOCKET_MUTATOR; + arg.value.pointer.vtable = &socket_mutator_arg_vtable; + arg.value.pointer.p = mutator; + return arg; +} diff --git a/src/core/lib/iomgr/socket_mutator.h b/src/core/lib/iomgr/socket_mutator.h new file mode 100644 index 0000000000..2f5b6c248e --- /dev/null +++ b/src/core/lib/iomgr/socket_mutator.h @@ -0,0 +1,80 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_SOCKET_MUTATOR_H +#define GRPC_CORE_LIB_IOMGR_SOCKET_MUTATOR_H + +#include <grpc/impl/codegen/grpc_types.h> +#include <grpc/support/sync.h> + +#ifdef __cplusplus +extern "C" { +#endif + +/** The virtual table of grpc_socket_mutator */ +typedef struct { + /** Mutates the socket opitons of \a fd */ + bool (*mutate_fd)(int fd, grpc_socket_mutator *mutator); + /** Compare socket mutator \a a and \a b */ + int (*compare)(grpc_socket_mutator *a, grpc_socket_mutator *b); + /** Destroys the socket mutator instance */ + void (*destory)(grpc_socket_mutator *mutator); +} grpc_socket_mutator_vtable; + +/** The Socket Mutator interface allows changes on socket options */ +struct grpc_socket_mutator { + const grpc_socket_mutator_vtable *vtable; + gpr_refcount refcount; +}; + +/** called by concrete implementations to initialize the base struct */ +void grpc_socket_mutator_init(grpc_socket_mutator *mutator, + const grpc_socket_mutator_vtable *vtable); + +/** Wrap \a mutator as a grpc_arg */ +grpc_arg grpc_socket_mutator_to_arg(grpc_socket_mutator *mutator); + +/** Perform the file descriptor mutation operation of \a mutator on \a fd */ +bool grpc_socket_mutator_mutate_fd(grpc_socket_mutator *mutator, int fd); + +/** Compare if \a a and \a b are the same mutator or have same settings */ +int grpc_socket_mutator_compare(grpc_socket_mutator *a, grpc_socket_mutator *b); + +grpc_socket_mutator *grpc_socket_mutator_ref(grpc_socket_mutator *mutator); +void grpc_socket_mutator_unref(grpc_socket_mutator *mutator); + +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_MUTATOR_H */ diff --git a/src/core/lib/iomgr/socket_utils_common_posix.c b/src/core/lib/iomgr/socket_utils_common_posix.c index bc28bbe316..88e9ade253 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.c +++ b/src/core/lib/iomgr/socket_utils_common_posix.c @@ -209,6 +209,15 @@ grpc_error *grpc_set_socket_low_latency(int fd, int low_latency) { return GRPC_ERROR_NONE; } +/* set a socket using a grpc_socket_mutator */ +grpc_error *grpc_set_socket_with_mutator(int fd, grpc_socket_mutator *mutator) { + GPR_ASSERT(mutator); + if (!grpc_socket_mutator_mutate_fd(mutator, fd)) { + return GRPC_ERROR_CREATE("grpc_socket_mutator failed."); + } + return GRPC_ERROR_NONE; +} + static gpr_once g_probe_ipv6_once = GPR_ONCE_INIT; static int g_ipv6_loopback_available; diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h index 175fb2b717..e84d3781a1 100644 --- a/src/core/lib/iomgr/socket_utils_posix.h +++ b/src/core/lib/iomgr/socket_utils_posix.h @@ -39,7 +39,9 @@ #include <sys/socket.h> #include <unistd.h> +#include <grpc/impl/codegen/grpc_types.h> #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/socket_mutator.h" /* a wrapper for accept or accept4 */ int grpc_accept4(int sockfd, grpc_resolved_address *resolved_addr, int nonblock, @@ -88,6 +90,9 @@ grpc_error *grpc_set_socket_sndbuf(int fd, int buffer_size_bytes); /* Tries to set the socket's receive buffer to given size. */ grpc_error *grpc_set_socket_rcvbuf(int fd, int buffer_size_bytes); +/* Tries to set the socket using a grpc_socket_mutator */ +grpc_error *grpc_set_socket_with_mutator(int fd, grpc_socket_mutator *mutator); + /* An enum to keep track of IPv4/IPv6 socket modes. Currently, this information is only used when a socket is first created, but diff --git a/src/core/lib/iomgr/socket_windows.c b/src/core/lib/iomgr/socket_windows.c index 35f23300dc..54911e0e31 100644 --- a/src/core/lib/iomgr/socket_windows.c +++ b/src/core/lib/iomgr/socket_windows.c @@ -76,6 +76,14 @@ void grpc_winsocket_shutdown(grpc_winsocket *winsocket) { LPFN_DISCONNECTEX DisconnectEx; DWORD ioctl_num_bytes; + gpr_mu_lock(&winsocket->state_mu); + if (winsocket->shutdown_called) { + gpr_mu_unlock(&winsocket->state_mu); + return; + } + winsocket->shutdown_called = true; + gpr_mu_unlock(&winsocket->state_mu); + status = WSAIoctl(winsocket->socket, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), &DisconnectEx, sizeof(DisconnectEx), &ioctl_num_bytes, NULL, NULL); diff --git a/src/core/lib/iomgr/socket_windows.h b/src/core/lib/iomgr/socket_windows.h index 490d0e0a06..a3875ce16c 100644 --- a/src/core/lib/iomgr/socket_windows.h +++ b/src/core/lib/iomgr/socket_windows.h @@ -87,6 +87,7 @@ typedef struct grpc_winsocket { grpc_winsocket_callback_info read_info; gpr_mu state_mu; + bool shutdown_called; /* You can't add the same socket twice to the same IO Completion Port. This prevents that. */ diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h index 18e6e60ebc..0485661316 100644 --- a/src/core/lib/iomgr/tcp_client.h +++ b/src/core/lib/iomgr/tcp_client.h @@ -34,6 +34,7 @@ #ifndef GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H #define GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H +#include <grpc/impl/codegen/grpc_types.h> #include <grpc/support/time.h> #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/pollset_set.h" diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index bc08c94ee0..a3a70a8ed7 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -51,6 +51,7 @@ #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_posix.h" #include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/socket_mutator.h" #include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/iomgr/tcp_posix.h" #include "src/core/lib/iomgr/timer.h" @@ -73,7 +74,8 @@ typedef struct { grpc_channel_args *channel_args; } async_connect; -static grpc_error *prepare_socket(const grpc_resolved_address *addr, int fd) { +static grpc_error *prepare_socket(const grpc_resolved_address *addr, int fd, + const grpc_channel_args *channel_args) { grpc_error *err = GRPC_ERROR_NONE; GPR_ASSERT(fd >= 0); @@ -88,6 +90,16 @@ static grpc_error *prepare_socket(const grpc_resolved_address *addr, int fd) { } err = grpc_set_socket_no_sigpipe_if_possible(fd); if (err != GRPC_ERROR_NONE) goto error; + if (channel_args) { + for (size_t i = 0; i < channel_args->num_args; i++) { + if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_SOCKET_MUTATOR)) { + GPR_ASSERT(channel_args->args[i].type == GRPC_ARG_POINTER); + grpc_socket_mutator *mutator = channel_args->args[i].value.pointer.p; + err = grpc_set_socket_with_mutator(fd, mutator); + if (err != GRPC_ERROR_NONE) goto error; + } + } + } goto done; error: @@ -239,8 +251,11 @@ finish: done = (--ac->refs == 0); gpr_mu_unlock(&ac->mu); if (error != GRPC_ERROR_NONE) { - error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION, - "Failed to connect to remote host"); + char *error_descr; + gpr_asprintf(&error_descr, "Failed to connect to remote host: %s", + grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION)); + error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION, error_descr); + gpr_free(error_descr); error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, ac->addr_str); } @@ -287,7 +302,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, GPR_ASSERT(grpc_sockaddr_is_v4mapped(addr, &addr4_copy)); addr = &addr4_copy; } - if ((error = prepare_socket(addr, fd)) != GRPC_ERROR_NONE) { + if ((error = prepare_socket(addr, fd, channel_args)) != GRPC_ERROR_NONE) { grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); return; } diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c index 4d1e809872..1127588ebc 100644 --- a/src/core/lib/iomgr/tcp_client_windows.c +++ b/src/core/lib/iomgr/tcp_client_windows.c @@ -107,18 +107,22 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { gpr_mu_lock(&ac->mu); - if (error == GRPC_ERROR_NONE && socket != NULL) { - DWORD transfered_bytes = 0; - DWORD flags; - BOOL wsa_success = - WSAGetOverlappedResult(socket->socket, &socket->write_info.overlapped, - &transfered_bytes, FALSE, &flags); - GPR_ASSERT(transfered_bytes == 0); - if (!wsa_success) { - error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx"); + if (error == GRPC_ERROR_NONE) { + if (socket != NULL) { + DWORD transfered_bytes = 0; + DWORD flags; + BOOL wsa_success = + WSAGetOverlappedResult(socket->socket, &socket->write_info.overlapped, + &transfered_bytes, FALSE, &flags); + GPR_ASSERT(transfered_bytes == 0); + if (!wsa_success) { + error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx"); + } else { + *ep = grpc_tcp_create(socket, ac->resource_quota, ac->addr_name); + socket = NULL; + } } else { - *ep = grpc_tcp_create(socket, ac->resource_quota, ac->addr_name); - socket = NULL; + error = GRPC_ERROR_CREATE("socket is null"); } } diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index c48f4e83cf..12a4797e6f 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -493,6 +493,11 @@ static char *tcp_get_peer(grpc_endpoint *ep) { return gpr_strdup(tcp->peer_string); } +static int tcp_get_fd(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + return tcp->fd; +} + static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; return grpc_fd_get_workqueue(tcp->em_fd); @@ -511,7 +516,8 @@ static const grpc_endpoint_vtable vtable = {tcp_read, tcp_shutdown, tcp_destroy, tcp_get_resource_user, - tcp_get_peer}; + tcp_get_peer, + tcp_get_fd}; grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, grpc_resource_quota *resource_quota, diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c index ae54c70d2d..b8a391c059 100644 --- a/src/core/lib/iomgr/tcp_server_windows.c +++ b/src/core/lib/iomgr/tcp_server_windows.c @@ -73,6 +73,7 @@ struct grpc_tcp_listener { /* The cached AcceptEx for that port. */ LPFN_ACCEPTEX AcceptEx; int shutting_down; + int outstanding_calls; /* closure for socket notification of accept being ready */ grpc_closure on_accept; /* linked list */ @@ -140,10 +141,9 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } -static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { - if (s->shutdown_complete != NULL) { - grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); - } +static void destroy_server(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_tcp_server *s = arg; /* Now that the accepts have been aborted, we can destroy the sockets. The IOCP won't get notified on these, so we can flag them as already @@ -159,6 +159,16 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { gpr_free(s); } +static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, + grpc_tcp_server *s) { + if (s->shutdown_complete != NULL) { + grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); + } + + grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(destroy_server, s), + GRPC_ERROR_NONE, NULL); +} + grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) { gpr_ref_non_zero(&s->refs); return s; @@ -180,17 +190,14 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { /* First, shutdown all fd's. This will queue abortion calls for all of the pending accepts due to the normal operation mechanism. */ if (s->active_ports == 0) { - immediately_done = 1; - } - for (sp = s->head; sp; sp = sp->next) { - sp->shutting_down = 1; - grpc_winsocket_shutdown(sp->socket); + finish_shutdown_locked(exec_ctx, s); + } else { + for (sp = s->head; sp; sp = sp->next) { + sp->shutting_down = 1; + grpc_winsocket_shutdown(sp->socket); + } } gpr_mu_unlock(&s->mu); - - if (immediately_done) { - finish_shutdown(exec_ctx, s); - } } void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { @@ -251,31 +258,30 @@ failure: return error; } -static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx, - grpc_tcp_listener *sp) { +static void decrement_active_ports_and_notify_locked(grpc_exec_ctx *exec_ctx, + grpc_tcp_listener *sp) { int notify = 0; sp->shutting_down = 0; - gpr_mu_lock(&sp->server->mu); GPR_ASSERT(sp->server->active_ports > 0); if (0 == --sp->server->active_ports) { - notify = 1; - } - gpr_mu_unlock(&sp->server->mu); - if (notify) { - finish_shutdown(exec_ctx, sp->server); + finish_shutdown_locked(exec_ctx, sp->server); } } /* In order to do an async accept, we need to create a socket first which will be the one assigned to the new incoming connection. */ -static grpc_error *start_accept(grpc_exec_ctx *exec_ctx, - grpc_tcp_listener *port) { +static grpc_error *start_accept_locked(grpc_exec_ctx *exec_ctx, + grpc_tcp_listener *port) { SOCKET sock = INVALID_SOCKET; BOOL success; DWORD addrlen = sizeof(struct sockaddr_in6) + 16; DWORD bytes_received = 0; grpc_error *error = GRPC_ERROR_NONE; + if (port->shutting_down) { + return GRPC_ERROR_NONE; + } + sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); if (sock == INVALID_SOCKET) { @@ -305,20 +311,11 @@ static grpc_error *start_accept(grpc_exec_ctx *exec_ctx, immediately process an accept that happened in the meantime. */ port->new_socket = sock; grpc_socket_notify_on_read(exec_ctx, port->socket, &port->on_accept); + port->outstanding_calls++; return error; failure: GPR_ASSERT(error != GRPC_ERROR_NONE); - if (port->shutting_down) { - /* We are abandoning the listener port, take that into account to prevent - occasional hangs on shutdown. The hang happens when sp->shutting_down - change is not seen by on_accept and we proceed to trying new accept, - but we fail there because the listening port has been closed in the - meantime. */ - decrement_active_ports_and_notify(exec_ctx, port); - GRPC_ERROR_UNREF(error); - return GRPC_ERROR_NONE; - } if (sock != INVALID_SOCKET) closesocket(sock); return error; } @@ -338,6 +335,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { BOOL wsa_success; int err; + gpr_mu_lock(&sp->server->mu); + peer_name.len = sizeof(struct sockaddr_storage); /* The general mechanism for shutting down is to queue abortion calls. While @@ -347,6 +346,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { const char *msg = grpc_error_string(error); gpr_log(GPR_INFO, "Skipping on_accept due to error: %s", msg); grpc_error_free_string(msg); + gpr_mu_unlock(&sp->server->mu); return; } @@ -356,17 +356,12 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, &transfered_bytes, FALSE, &flags); if (!wsa_success) { - if (sp->shutting_down) { - /* During the shutdown case, we ARE expecting an error. So that's well, - and we can wake up the shutdown thread. */ - decrement_active_ports_and_notify(exec_ctx, sp); - return; - } else { + if (!sp->shutting_down) { char *utf8_message = gpr_format_message(WSAGetLastError()); gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message); gpr_free(utf8_message); - closesocket(sock); } + closesocket(sock); } else { if (!sp->shutting_down) { peer_name_string = NULL; @@ -408,7 +403,12 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { the former socked we created has now either been destroy or assigned to the new connection. We need to create a new one for the next connection. */ - GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept(exec_ctx, sp))); + GPR_ASSERT( + GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(exec_ctx, sp))); + if (0 == --sp->outstanding_calls) { + decrement_active_ports_and_notify_locked(exec_ctx, sp); + } + gpr_mu_unlock(&sp->server->mu); } static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, @@ -456,6 +456,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, sp->server = s; sp->socket = grpc_winsocket_create(sock, "listener"); sp->shutting_down = 0; + sp->outstanding_calls = 0; sp->AcceptEx = AcceptEx; sp->new_socket = INVALID_SOCKET; sp->port = port; @@ -553,7 +554,8 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, s->on_accept_cb = on_accept_cb; s->on_accept_cb_arg = on_accept_cb_arg; for (sp = s->head; sp; sp = sp->next) { - GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept(exec_ctx, sp))); + GPR_ASSERT( + GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(exec_ctx, sp))); s->active_ports++; } gpr_mu_unlock(&s->mu); diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index 8b58c04ff5..6e2ad1dbe9 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -38,14 +38,17 @@ #include <limits.h> #include <string.h> +#include <grpc/slice_buffer.h> + #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/slice_buffer.h> #include <grpc/support/string_util.h> #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/network_status_tracker.h" +#include "src/core/lib/iomgr/resource_quota.h" #include "src/core/lib/iomgr/tcp_uv.h" +#include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" int grpc_tcp_trace = 0; @@ -62,15 +65,14 @@ typedef struct { grpc_closure *read_cb; grpc_closure *write_cb; - GRPC_SLICE read_slice; - GRPC_SLICE_buffer *read_slices; - GRPC_SLICE_buffer *write_slices; + grpc_slice read_slice; + grpc_slice_buffer *read_slices; + grpc_slice_buffer *write_slices; uv_buf_t *write_buffers; - grpc_resource_user resource_user; + grpc_resource_user *resource_user; bool shutting_down; - bool resource_user_shutting_down; char *peer_string; grpc_pollset *pollset; @@ -78,23 +80,23 @@ typedef struct { static void uv_close_callback(uv_handle_t *handle) { gpr_free(handle); } -static void tcp_free(grpc_tcp *tcp) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_resource_user_destroy(&exec_ctx, &tcp->resource_user); +static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + grpc_resource_user_unref(exec_ctx, tcp->resource_user); gpr_free(tcp); - grpc_exec_ctx_finish(&exec_ctx); } /*#define GRPC_TCP_REFCOUNT_DEBUG*/ #ifdef GRPC_TCP_REFCOUNT_DEBUG -#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__) -#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) -static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file, - int line) { +#define TCP_UNREF(exec_ctx, tcp, reason) \ + tcp_unref((exec_ctx), (tcp), (reason), __FILE__, __LINE__) +#define TCP_REF(tcp, reason) \ + tcp_ref((exec_ctx), (tcp), (reason), __FILE__, __LINE__) +static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, + const char *reason, const char *file, int line) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp, reason, tcp->refcount.count, tcp->refcount.count - 1); if (gpr_unref(&tcp->refcount)) { - tcp_free(tcp); + tcp_free(exec_ctx, tcp); } } @@ -105,11 +107,11 @@ static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file, gpr_ref(&tcp->refcount); } #else -#define TCP_UNREF(tcp, reason) tcp_unref((tcp)) +#define TCP_UNREF(exec_ctx, tcp, reason) tcp_unref((exec_ctx), (tcp)) #define TCP_REF(tcp, reason) tcp_ref((tcp)) -static void tcp_unref(grpc_tcp *tcp) { +static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { if (gpr_unref(&tcp->refcount)) { - tcp_free(tcp); + tcp_free(exec_ctx, tcp); } } @@ -122,7 +124,7 @@ static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size, grpc_tcp *tcp = handle->data; (void)suggested_size; tcp->read_slice = grpc_resource_user_slice_malloc( - &exec_ctx, &tcp->resource_user, GRPC_TCP_DEFAULT_READ_SLICE_SIZE); + &exec_ctx, tcp->resource_user, GRPC_TCP_DEFAULT_READ_SLICE_SIZE); buf->base = (char *)GRPC_SLICE_START_PTR(tcp->read_slice); buf->len = GRPC_SLICE_LENGTH(tcp->read_slice); grpc_exec_ctx_finish(&exec_ctx); @@ -130,7 +132,7 @@ static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size, static void read_callback(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { - GRPC_SLICE sub; + grpc_slice sub; grpc_error *error; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_tcp *tcp = stream->data; @@ -139,7 +141,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, // Nothing happened. Wait for the next callback return; } - TCP_UNREF(tcp, "read"); + TCP_UNREF(&exec_ctx, tcp, "read"); tcp->read_cb = NULL; // TODO(murgatroid99): figure out what the return value here means uv_read_stop(stream); @@ -147,8 +149,8 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, error = GRPC_ERROR_CREATE("EOF"); } else if (nread > 0) { // Successful read - sub = GRPC_SLICE_sub_no_ref(tcp->read_slice, 0, (size_t)nread); - GRPC_SLICE_buffer_add(tcp->read_slices, sub); + sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, (size_t)nread); + grpc_slice_buffer_add(tcp->read_slices, sub); error = GRPC_ERROR_NONE; if (grpc_tcp_trace) { size_t i; @@ -156,8 +158,8 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, gpr_log(GPR_DEBUG, "read: error=%s", str); grpc_error_free_string(str); for (i = 0; i < tcp->read_slices->count; i++) { - char *dump = gpr_dump_slice(tcp->read_slices->slices[i], - GPR_DUMP_HEX | GPR_DUMP_ASCII); + char *dump = grpc_dump_slice(tcp->read_slices->slices[i], + GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump); gpr_free(dump); @@ -172,14 +174,14 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, } static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - GRPC_SLICE_buffer *read_slices, grpc_closure *cb) { + grpc_slice_buffer *read_slices, grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; int status; grpc_error *error = GRPC_ERROR_NONE; GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; tcp->read_slices = read_slices; - GRPC_SLICE_buffer_reset_and_unref(read_slices); + grpc_slice_buffer_reset_and_unref(read_slices); TCP_REF(tcp, "read"); // TODO(murgatroid99): figure out what the return value here means status = @@ -202,7 +204,7 @@ static void write_callback(uv_write_t *req, int status) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_closure *cb = tcp->write_cb; tcp->write_cb = NULL; - TCP_UNREF(tcp, "write"); + TCP_UNREF(&exec_ctx, tcp, "write"); if (status == 0) { error = GRPC_ERROR_NONE; } else { @@ -213,28 +215,28 @@ static void write_callback(uv_write_t *req, int status) { gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str); } gpr_free(tcp->write_buffers); - grpc_resource_user_free(&exec_ctx, &tcp->resource_user, + grpc_resource_user_free(&exec_ctx, tcp->resource_user, sizeof(uv_buf_t) * tcp->write_slices->count); grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL); grpc_exec_ctx_finish(&exec_ctx); } static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - GRPC_SLICE_buffer *write_slices, + grpc_slice_buffer *write_slices, grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; uv_buf_t *buffers; unsigned int buffer_count; unsigned int i; - GRPC_SLICE *slice; + grpc_slice *slice; uv_write_t *write_req; if (grpc_tcp_trace) { size_t j; for (j = 0; j < write_slices->count; j++) { - char *data = gpr_dump_slice(write_slices->slices[j], - GPR_DUMP_HEX | GPR_DUMP_ASCII); + char *data = grpc_dump_slice(write_slices->slices[j], + GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data); gpr_free(data); } @@ -259,7 +261,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->write_cb = cb; buffer_count = (unsigned int)tcp->write_slices->count; buffers = gpr_malloc(sizeof(uv_buf_t) * buffer_count); - grpc_resource_user_alloc(exec_ctx, &tcp->resource_user, + grpc_resource_user_alloc(exec_ctx, tcp->resource_user, sizeof(uv_buf_t) * buffer_count, NULL); for (i = 0; i < buffer_count; i++) { slice = &tcp->write_slices->slices[i]; @@ -295,22 +297,6 @@ static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, static void shutdown_callback(uv_shutdown_t *req, int status) {} -static void resource_user_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - TCP_UNREF(arg, "resource_user"); -} - -static void uv_resource_user_maybe_shutdown(grpc_exec_ctx *exec_ctx, - grpc_tcp *tcp) { - if (!tcp->resource_user_shutting_down) { - tcp->resource_user_shutting_down = true; - TCP_REF(tcp, "resource_user"); - grpc_resource_user_shutdown( - exec_ctx, &tcp->resource_user, - grpc_closure_create(resource_user_shutdown_done, tcp)); - } -} - static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; if (!tcp->shutting_down) { @@ -324,8 +310,7 @@ static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp *tcp = (grpc_tcp *)ep; uv_close((uv_handle_t *)tcp->handle, uv_close_callback); - uv_resource_user_maybe_shutdown(exec_ctx, tcp); - TCP_UNREF(tcp, "destroy"); + TCP_UNREF(exec_ctx, tcp, "destroy"); } static char *uv_get_peer(grpc_endpoint *ep) { @@ -335,15 +320,18 @@ static char *uv_get_peer(grpc_endpoint *ep) { static grpc_resource_user *uv_get_resource_user(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; - return &tcp->resource_user; + return tcp->resource_user; } static grpc_workqueue *uv_get_workqueue(grpc_endpoint *ep) { return NULL; } +static int uv_get_fd(grpc_endpoint *ep) { return -1; } + static grpc_endpoint_vtable vtable = { uv_endpoint_read, uv_endpoint_write, uv_get_workqueue, uv_add_to_pollset, uv_add_to_pollset_set, uv_endpoint_shutdown, - uv_destroy, uv_get_resource_user, uv_get_peer}; + uv_destroy, uv_get_resource_user, uv_get_peer, + uv_get_fd}; grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, grpc_resource_quota *resource_quota, @@ -364,8 +352,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, gpr_ref_init(&tcp->refcount, 1); tcp->peer_string = gpr_strdup(peer_string); tcp->shutting_down = false; - tcp->resource_user_shutting_down = false; - grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string); + tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); /* Tell network status tracking code about the new endpoint */ grpc_network_status_register_endpoint(&tcp->base); diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 1fb7edc2b1..d4613b674e 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -402,6 +402,8 @@ static grpc_resource_user *win_get_resource_user(grpc_endpoint *ep) { return tcp->resource_user; } +static int win_get_fd(grpc_endpoint *ep) { return -1; } + static grpc_endpoint_vtable vtable = {win_read, win_write, win_get_workqueue, @@ -410,7 +412,8 @@ static grpc_endpoint_vtable vtable = {win_read, win_shutdown, win_destroy, win_get_resource_user, - win_get_peer}; + win_get_peer, + win_get_fd}; grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, grpc_resource_quota *resource_quota, @@ -420,7 +423,7 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, tcp->base.vtable = &vtable; tcp->socket = socket; gpr_mu_init(&tcp->mu); - gpr_ref_init(&tcp->refcount, 2); + gpr_ref_init(&tcp->refcount, 1); grpc_closure_init(&tcp->on_read, on_read, tcp); grpc_closure_init(&tcp->on_write, on_write, tcp); tcp->peer_string = gpr_strdup(peer_string); diff --git a/src/core/lib/iomgr/wakeup_fd_pipe.c b/src/core/lib/iomgr/wakeup_fd_pipe.c index 183f0eb930..e186d63683 100644 --- a/src/core/lib/iomgr/wakeup_fd_pipe.c +++ b/src/core/lib/iomgr/wakeup_fd_pipe.c @@ -95,6 +95,8 @@ static void pipe_destroy(grpc_wakeup_fd* fd_info) { static int pipe_check_availability(void) { grpc_wakeup_fd fd; + fd.read_fd = fd.write_fd = -1; + if (pipe_init(&fd) == GRPC_ERROR_NONE) { pipe_destroy(&fd); return 1; diff --git a/src/core/lib/json/json.c b/src/core/lib/json/json.c index 5b583a1f2e..48b13686d7 100644 --- a/src/core/lib/json/json.c +++ b/src/core/lib/json/json.c @@ -37,15 +37,15 @@ #include "src/core/lib/json/json.h" -grpc_json *grpc_json_create(grpc_json_type type) { - grpc_json *json = gpr_malloc(sizeof(*json)); +grpc_json* grpc_json_create(grpc_json_type type) { + grpc_json* json = gpr_malloc(sizeof(*json)); memset(json, 0, sizeof(*json)); json->type = type; return json; } -void grpc_json_destroy(grpc_json *json) { +void grpc_json_destroy(grpc_json* json) { while (json->child) { grpc_json_destroy(json->child); } diff --git a/src/core/lib/json/json.h b/src/core/lib/json/json.h index 681df4bb77..7111db0b52 100644 --- a/src/core/lib/json/json.h +++ b/src/core/lib/json/json.h @@ -42,14 +42,14 @@ * are not owned by it. */ typedef struct grpc_json { - struct grpc_json *next; - struct grpc_json *prev; - struct grpc_json *child; - struct grpc_json *parent; + struct grpc_json* next; + struct grpc_json* prev; + struct grpc_json* child; + struct grpc_json* parent; grpc_json_type type; - const char *key; - const char *value; + const char* key; + const char* value; } grpc_json; /* The next two functions are going to parse the input string, and @@ -65,8 +65,8 @@ typedef struct grpc_json { * * Delete the allocated tree afterward using grpc_json_destroy(). */ -grpc_json *grpc_json_parse_string_with_len(char *input, size_t size); -grpc_json *grpc_json_parse_string(char *input); +grpc_json* grpc_json_parse_string_with_len(char* input, size_t size); +grpc_json* grpc_json_parse_string(char* input); /* This function will create a new string using gpr_realloc, and will * deserialize the grpc_json tree into it. It'll be zero-terminated, @@ -76,13 +76,13 @@ grpc_json *grpc_json_parse_string(char *input); * If indent is 0, then newlines will be suppressed as well, and the * output will be condensed at its maximum. */ -char *grpc_json_dump_to_string(grpc_json *json, int indent); +char* grpc_json_dump_to_string(grpc_json* json, int indent); /* Use these to create or delete a grpc_json object. * Deletion is recursive. We will not attempt to free any of the strings * in any of the objects of that tree. */ -grpc_json *grpc_json_create(grpc_json_type type); -void grpc_json_destroy(grpc_json *json); +grpc_json* grpc_json_create(grpc_json_type type); +void grpc_json_destroy(grpc_json* json); #endif /* GRPC_CORE_LIB_JSON_JSON_H */ diff --git a/src/core/lib/security/credentials/jwt/jwt_credentials.c b/src/core/lib/security/credentials/jwt/jwt_credentials.c index f87ba0ce8d..3daf0f4ef7 100644 --- a/src/core/lib/security/credentials/jwt/jwt_credentials.c +++ b/src/core/lib/security/credentials/jwt/jwt_credentials.c @@ -144,17 +144,44 @@ grpc_service_account_jwt_access_credentials_create_from_auth_json_key( return &c->base; } +static char *redact_private_key(const char *json_key) { + char *json_copy = gpr_strdup(json_key); + grpc_json *json = grpc_json_parse_string(json_copy); + if (!json) { + gpr_free(json_copy); + return gpr_strdup("<Json failed to parse.>"); + } + const char *redacted = "<redacted>"; + grpc_json *current = json->child; + while (current) { + if (current->type == GRPC_JSON_STRING && + strcmp(current->key, "private_key") == 0) { + current->value = (char *)redacted; + break; + } + current = current->next; + } + char *clean_json = grpc_json_dump_to_string(json, 2); + gpr_free(json_copy); + grpc_json_destroy(json); + return clean_json; +} + grpc_call_credentials *grpc_service_account_jwt_access_credentials_create( const char *json_key, gpr_timespec token_lifetime, void *reserved) { - GRPC_API_TRACE( - "grpc_service_account_jwt_access_credentials_create(" - "json_key=%s, " - "token_lifetime=" - "gpr_timespec { tv_sec: %" PRId64 - ", tv_nsec: %d, clock_type: %d }, " - "reserved=%p)", - 5, (json_key, token_lifetime.tv_sec, token_lifetime.tv_nsec, - (int)token_lifetime.clock_type, reserved)); + if (grpc_api_trace) { + char *clean_json = redact_private_key(json_key); + gpr_log(GPR_INFO, + "grpc_service_account_jwt_access_credentials_create(" + "json_key=%s, " + "token_lifetime=" + "gpr_timespec { tv_sec: %" PRId64 + ", tv_nsec: %d, clock_type: %d }, " + "reserved=%p)", + clean_json, token_lifetime.tv_sec, token_lifetime.tv_nsec, + (int)token_lifetime.clock_type, reserved); + gpr_free(clean_json); + } GPR_ASSERT(reserved == NULL); return grpc_service_account_jwt_access_credentials_create_from_auth_json_key( grpc_auth_json_key_create_from_string(json_key), token_lifetime); diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c index d980577c46..b3625b22c0 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c @@ -392,15 +392,32 @@ grpc_refresh_token_credentials_create_from_auth_refresh_token( return &c->base.base; } +static char *create_loggable_refresh_token(grpc_auth_refresh_token *token) { + if (strcmp(token->type, GRPC_AUTH_JSON_TYPE_INVALID) == 0) { + return gpr_strdup("<Invalid json token>"); + } + char *loggable_token = NULL; + gpr_asprintf(&loggable_token, + "{\n type: %s\n client_id: %s\n client_secret: " + "<redacted>\n refresh_token: <redacted>\n}", + token->type, token->client_id); + return loggable_token; +} + grpc_call_credentials *grpc_google_refresh_token_credentials_create( const char *json_refresh_token, void *reserved) { - GRPC_API_TRACE( - "grpc_refresh_token_credentials_create(json_refresh_token=%s, " - "reserved=%p)", - 2, (json_refresh_token, reserved)); + grpc_auth_refresh_token token = + grpc_auth_refresh_token_create_from_string(json_refresh_token); + if (grpc_api_trace) { + char *loggable_token = create_loggable_refresh_token(&token); + gpr_log(GPR_INFO, + "grpc_refresh_token_credentials_create(json_refresh_token=%s, " + "reserved=%p)", + loggable_token, reserved); + gpr_free(loggable_token); + } GPR_ASSERT(reserved == NULL); - return grpc_refresh_token_credentials_create_from_auth_refresh_token( - grpc_auth_refresh_token_create_from_string(json_refresh_token)); + return grpc_refresh_token_credentials_create_from_auth_refresh_token(token); } // @@ -430,9 +447,9 @@ grpc_call_credentials *grpc_access_token_credentials_create( gpr_malloc(sizeof(grpc_access_token_credentials)); char *token_md_value; GRPC_API_TRACE( - "grpc_access_token_credentials_create(access_token=%s, " + "grpc_access_token_credentials_create(access_token=<redacted>, " "reserved=%p)", - 2, (access_token, reserved)); + 1, (reserved)); GPR_ASSERT(reserved == NULL); memset(c, 0, sizeof(grpc_access_token_credentials)); c->base.type = GRPC_CALL_CREDENTIALS_TYPE_OAUTH2; diff --git a/src/core/lib/security/credentials/plugin/plugin_credentials.c b/src/core/lib/security/credentials/plugin/plugin_credentials.c index 61c10862da..5d950098a0 100644 --- a/src/core/lib/security/credentials/plugin/plugin_credentials.c +++ b/src/core/lib/security/credentials/plugin/plugin_credentials.c @@ -104,6 +104,8 @@ static void plugin_md_request_metadata_ready(void *request, grpc_slice_unref(md_array[i].value); } gpr_free(md_array); + } else if (num_md == 0) { + r->cb(&exec_ctx, r->user_data, NULL, 0, GRPC_CREDENTIALS_OK, NULL); } } gpr_free(r); diff --git a/src/core/lib/security/transport/handshake.c b/src/core/lib/security/transport/handshake.c index 01e7fab773..9623797610 100644 --- a/src/core/lib/security/transport/handshake.c +++ b/src/core/lib/security/transport/handshake.c @@ -125,7 +125,7 @@ static void security_handshake_done(grpc_exec_ctx *exec_ctx, h->auth_context); } else { const char *msg = grpc_error_string(error); - gpr_log(GPR_ERROR, "Security handshake failed: %s", msg); + gpr_log(GPR_DEBUG, "Security handshake failed: %s", msg); grpc_error_free_string(msg); if (h->secure_endpoint != NULL) { diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index fba3314812..1b278410e8 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -31,7 +31,12 @@ * */ -#include "src/core/lib/security/transport/secure_endpoint.h" +/* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when + using that endpoint. Because of various transitive includes in uv.h, + including windows.h on Windows, uv.h must be included before other system + headers. Therefore, sockaddr.h must always be included first */ +#include "src/core/lib/iomgr/sockaddr.h" + #include <grpc/slice.h> #include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> @@ -39,6 +44,7 @@ #include <grpc/support/sync.h> #include "src/core/lib/debug/trace.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/security/transport/secure_endpoint.h" #include "src/core/lib/security/transport/tsi_error.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" @@ -366,6 +372,8 @@ static char *endpoint_get_peer(grpc_endpoint *secure_ep) { return grpc_endpoint_get_peer(ep->wrapped_ep); } +static int endpoint_get_fd(grpc_endpoint *secure_ep) { return -1; } + static grpc_workqueue *endpoint_get_workqueue(grpc_endpoint *secure_ep) { secure_endpoint *ep = (secure_endpoint *)secure_ep; return grpc_endpoint_get_workqueue(ep->wrapped_ep); @@ -385,7 +393,8 @@ static const grpc_endpoint_vtable vtable = {endpoint_read, endpoint_shutdown, endpoint_destroy, endpoint_get_resource_user, - endpoint_get_peer}; + endpoint_get_peer, + endpoint_get_fd}; grpc_endpoint *grpc_secure_endpoint_create( struct tsi_frame_protector *protector, grpc_endpoint *transport, diff --git a/src/core/lib/slice/slice.c b/src/core/lib/slice/slice.c index 3dac18df61..52977e6d9a 100644 --- a/src/core/lib/slice/slice.c +++ b/src/core/lib/slice/slice.c @@ -348,3 +348,11 @@ int grpc_slice_str_cmp(grpc_slice a, const char *b) { if (d != 0) return d; return memcmp(GRPC_SLICE_START_PTR(a), b, b_length); } + +int grpc_slice_is_equivalent(grpc_slice a, grpc_slice b) { + if (a.refcount == NULL || b.refcount == NULL) { + return grpc_slice_cmp(a, b) == 0; + } + return a.data.refcounted.length == b.data.refcounted.length && + a.data.refcounted.bytes == b.data.refcounted.bytes; +} diff --git a/src/core/lib/support/string.c b/src/core/lib/support/string.c index dc243bf0bf..f10a30f0fd 100644 --- a/src/core/lib/support/string.c +++ b/src/core/lib/support/string.c @@ -34,7 +34,9 @@ #include "src/core/lib/support/string.h" #include <ctype.h> +#include <limits.h> #include <stddef.h> +#include <stdlib.h> #include <string.h> #include <grpc/support/alloc.h> @@ -189,6 +191,13 @@ int int64_ttoa(int64_t value, char *string) { return i; } +int gpr_parse_nonnegative_int(const char *value) { + char *end; + long result = strtol(value, &end, 0); + if (*end != '\0' || result < 0 || result > INT_MAX) return -1; + return (int)result; +} + char *gpr_leftpad(const char *str, char flag, size_t length) { const size_t str_length = strlen(str); const size_t out_length = str_length > length ? str_length : length; diff --git a/src/core/lib/support/string.h b/src/core/lib/support/string.h index 13891d0b9e..e933e2eb46 100644 --- a/src/core/lib/support/string.h +++ b/src/core/lib/support/string.h @@ -77,6 +77,9 @@ NOTE: This function ensures sufficient bit width even on Win x64, where long is 32bit is size.*/ int int64_ttoa(int64_t value, char *output); +// Parses a non-negative number from a value string. Returns -1 on error. +int gpr_parse_nonnegative_int(const char *value); + /* Reverse a run of bytes */ void gpr_reverse_bytes(char *str, int len); diff --git a/src/core/lib/support/subprocess_posix.c b/src/core/lib/support/subprocess_posix.c index 4f4de9298e..4247a1c12b 100644 --- a/src/core/lib/support/subprocess_posix.c +++ b/src/core/lib/support/subprocess_posix.c @@ -40,6 +40,7 @@ #include <assert.h> #include <errno.h> #include <signal.h> +#include <stdbool.h> #include <stdio.h> #include <stdlib.h> #include <string.h> @@ -52,7 +53,7 @@ struct gpr_subprocess { int pid; - int joined; + bool joined; }; const char *gpr_subprocess_binary_extension() { return ""; } @@ -97,9 +98,11 @@ retry: if (errno == EINTR) { goto retry; } - gpr_log(GPR_ERROR, "waitpid failed: %s", strerror(errno)); + gpr_log(GPR_ERROR, "waitpid failed for pid %d: %s", p->pid, + strerror(errno)); return -1; } + p->joined = true; return status; } diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index eafcba22fc..1e0f3eeca5 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -123,6 +123,7 @@ struct grpc_call { grpc_channel *channel; grpc_call *parent; grpc_call *first_child; + gpr_timespec start_time; /* TODO(ctiller): share with cq if possible? */ gpr_mu mu; @@ -240,6 +241,7 @@ grpc_error *grpc_call_create(const grpc_call_create_args *args, call->channel = args->channel; call->cq = args->cq; call->parent = args->parent_call; + call->start_time = gpr_now(GPR_CLOCK_MONOTONIC); /* Always support no compression */ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); call->is_client = args->server_transport_data == NULL; @@ -312,10 +314,10 @@ grpc_error *grpc_call_create(const grpc_call_create_args *args, GRPC_CHANNEL_INTERNAL_REF(args->channel, "call"); /* initial refcount dropped by grpc_call_destroy */ - grpc_error *error = - grpc_call_stack_init(&exec_ctx, channel_stack, 1, destroy_call, call, - call->context, args->server_transport_data, path, - send_deadline, CALL_STACK_FROM_CALL(call)); + grpc_error *error = grpc_call_stack_init( + &exec_ctx, channel_stack, 1, destroy_call, call, call->context, + args->server_transport_data, path, call->start_time, send_deadline, + CALL_STACK_FROM_CALL(call)); if (error != GRPC_ERROR_NONE) { grpc_status_code status; const char *error_str; @@ -428,6 +430,8 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, get_final_status(call, set_status_value_directly, &c->final_info.final_status); + c->final_info.stats.latency = + gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call"); @@ -633,9 +637,6 @@ static int prepare_application_metadata(grpc_call *call, int count, if (call->send_extra_metadata_count == 0) { prepend_extra_metadata = 0; } else { - for (i = 0; i < call->send_extra_metadata_count; i++) { - GRPC_MDELEM_REF(call->send_extra_metadata[i].md); - } for (i = 1; i < call->send_extra_metadata_count; i++) { call->send_extra_metadata[i].prev = &call->send_extra_metadata[i - 1]; } @@ -681,6 +682,7 @@ static int prepare_application_metadata(grpc_call *call, int count, &call->send_extra_metadata[call->send_extra_metadata_count - 1]; batch->list.head->prev = NULL; batch->list.tail->next = NULL; + call->send_extra_metadata_count = 0; break; case 3: { /* prepend AND md */ @@ -696,6 +698,7 @@ static int prepare_application_metadata(grpc_call *call, int count, batch->list.tail = linked_from_md(last_md); batch->list.head->prev = NULL; batch->list.tail->next = NULL; + call->send_extra_metadata_count = 0; break; } default: diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 89dd825460..fe73aa375c 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -195,6 +195,7 @@ struct grpc_server { grpc_completion_queue **cqs; grpc_pollset **pollsets; size_t cq_count; + size_t pollset_count; bool started; /* The two following mutexes control access to server-state @@ -1085,7 +1086,7 @@ void grpc_server_start(grpc_server *server) { GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server)); server->started = true; - size_t pollset_count = 0; + server->pollset_count = 0; server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); server->request_freelist_per_cq = gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count); @@ -1093,7 +1094,8 @@ void grpc_server_start(grpc_server *server) { gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count); for (i = 0; i < server->cq_count; i++) { if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) { - server->pollsets[pollset_count++] = grpc_cq_pollset(server->cqs[i]); + server->pollsets[server->pollset_count++] = + grpc_cq_pollset(server->cqs[i]); } server->request_freelist_per_cq[i] = gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq); @@ -1112,7 +1114,8 @@ void grpc_server_start(grpc_server *server) { } for (l = server->listeners; l; l = l->next) { - l->start(&exec_ctx, server, l->arg, server->pollsets, pollset_count); + l->start(&exec_ctx, server, l->arg, server->pollsets, + server->pollset_count); } grpc_exec_ctx_finish(&exec_ctx); @@ -1120,7 +1123,7 @@ void grpc_server_start(grpc_server *server) { void grpc_server_get_pollsets(grpc_server *server, grpc_pollset ***pollsets, size_t *pollset_count) { - *pollset_count = server->cq_count; + *pollset_count = server->pollset_count; *pollsets = server->pollsets; } diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c index fdb5307814..4f49d7cf7d 100644 --- a/src/core/lib/transport/connectivity_state.c +++ b/src/core/lib/transport/connectivity_state.c @@ -43,6 +43,8 @@ int grpc_connectivity_state_trace = 0; const char *grpc_connectivity_state_name(grpc_connectivity_state state) { switch (state) { + case GRPC_CHANNEL_INIT: + return "INIT"; case GRPC_CHANNEL_IDLE: return "IDLE"; case GRPC_CHANNEL_CONNECTING: @@ -98,7 +100,12 @@ grpc_connectivity_state grpc_connectivity_state_check( return tracker->current_state; } -int grpc_connectivity_state_notify_on_state_change( +bool grpc_connectivity_state_has_watchers( + grpc_connectivity_state_tracker *connectivity_state) { + return connectivity_state->watchers != NULL; +} + +bool grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_closure *notify) { if (grpc_connectivity_state_trace) { @@ -117,7 +124,7 @@ int grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL); tracker->watchers = w->next; gpr_free(w); - return 0; + return false; } while (w != NULL) { grpc_connectivity_state_watcher *rm_candidate = w->next; @@ -125,11 +132,11 @@ int grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL); w->next = w->next->next; gpr_free(rm_candidate); - return 0; + return false; } w = w->next; } - return 0; + return false; } else { if (tracker->current_state != *current) { *current = tracker->current_state; @@ -159,6 +166,7 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, grpc_error_free_string(error_string); } switch (state) { + case GRPC_CHANNEL_INIT: case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: case GRPC_CHANNEL_READY: diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h index 7a2fa52c10..769c675b79 100644 --- a/src/core/lib/transport/connectivity_state.h +++ b/src/core/lib/transport/connectivity_state.h @@ -75,13 +75,16 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, grpc_error *associated_error, const char *reason); +bool grpc_connectivity_state_has_watchers( + grpc_connectivity_state_tracker *tracker); + grpc_connectivity_state grpc_connectivity_state_check( grpc_connectivity_state_tracker *tracker, grpc_error **current_error); /** Return 1 if the channel should start connecting, 0 otherwise. If current==NULL cancel notify if it is already queued (success==0 in that case) */ -int grpc_connectivity_state_notify_on_state_change( +bool grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_closure *notify); diff --git a/src/core/lib/transport/mdstr_hash_table.c b/src/core/lib/transport/mdstr_hash_table.c index 8e914c420b..3d01e56df7 100644 --- a/src/core/lib/transport/mdstr_hash_table.c +++ b/src/core/lib/transport/mdstr_hash_table.c @@ -41,7 +41,6 @@ struct grpc_mdstr_hash_table { gpr_refcount refs; - size_t num_entries; size_t size; grpc_mdstr_hash_table_entry* entries; }; @@ -77,7 +76,6 @@ grpc_mdstr_hash_table* grpc_mdstr_hash_table_create( grpc_mdstr_hash_table* table = gpr_malloc(sizeof(*table)); memset(table, 0, sizeof(*table)); gpr_ref_init(&table->refs, 1); - table->num_entries = num_entries; // Quadratic probing gets best performance when the table is no more // than half full. table->size = num_entries * 2; @@ -96,7 +94,7 @@ grpc_mdstr_hash_table* grpc_mdstr_hash_table_ref(grpc_mdstr_hash_table* table) { return table; } -int grpc_mdstr_hash_table_unref(grpc_mdstr_hash_table* table) { +void grpc_mdstr_hash_table_unref(grpc_mdstr_hash_table* table) { if (table != NULL && gpr_unref(&table->refs)) { for (size_t i = 0; i < table->size; ++i) { grpc_mdstr_hash_table_entry* entry = &table->entries[i]; @@ -107,13 +105,7 @@ int grpc_mdstr_hash_table_unref(grpc_mdstr_hash_table* table) { } gpr_free(table->entries); gpr_free(table); - return 1; } - return 0; -} - -size_t grpc_mdstr_hash_table_num_entries(const grpc_mdstr_hash_table* table) { - return table->num_entries; } void* grpc_mdstr_hash_table_get(const grpc_mdstr_hash_table* table, @@ -123,35 +115,3 @@ void* grpc_mdstr_hash_table_get(const grpc_mdstr_hash_table* table, if (idx == table->size) return NULL; // Not found. return table->entries[idx].value; } - -int grpc_mdstr_hash_table_cmp(const grpc_mdstr_hash_table* table1, - const grpc_mdstr_hash_table* table2) { - // Compare by num_entries. - if (table1->num_entries < table2->num_entries) return -1; - if (table1->num_entries > table2->num_entries) return 1; - for (size_t i = 0; i < table1->num_entries; ++i) { - grpc_mdstr_hash_table_entry* e1 = &table1->entries[i]; - grpc_mdstr_hash_table_entry* e2 = &table2->entries[i]; - // Compare keys by hash value. - if (e1->key->hash < e2->key->hash) return -1; - if (e1->key->hash > e2->key->hash) return 1; - // Compare by vtable (pointer equality). - if (e1->vtable < e2->vtable) return -1; - if (e1->vtable > e2->vtable) return 1; - // Compare values via vtable. - const int value_result = e1->vtable->compare_value(e1->value, e2->value); - if (value_result != 0) return value_result; - } - return 0; -} - -void grpc_mdstr_hash_table_iterate( - const grpc_mdstr_hash_table* table, - void (*func)(const grpc_mdstr_hash_table_entry* entry, void* user_data), - void* user_data) { - for (size_t i = 0; i < table->size; ++i) { - if (table->entries[i].key != NULL) { - func(&table->entries[i], user_data); - } - } -} diff --git a/src/core/lib/transport/mdstr_hash_table.h b/src/core/lib/transport/mdstr_hash_table.h index bceb4df93d..8982ec3a8d 100644 --- a/src/core/lib/transport/mdstr_hash_table.h +++ b/src/core/lib/transport/mdstr_hash_table.h @@ -51,7 +51,6 @@ typedef struct grpc_mdstr_hash_table grpc_mdstr_hash_table; typedef struct grpc_mdstr_hash_table_vtable { void (*destroy_value)(void* value); void* (*copy_value)(void* value); - int (*compare_value)(void* value1, void* value2); } grpc_mdstr_hash_table_vtable; typedef struct grpc_mdstr_hash_table_entry { @@ -67,26 +66,11 @@ grpc_mdstr_hash_table* grpc_mdstr_hash_table_create( size_t num_entries, grpc_mdstr_hash_table_entry* entries); grpc_mdstr_hash_table* grpc_mdstr_hash_table_ref(grpc_mdstr_hash_table* table); -/** Returns 1 when \a table is destroyed. */ -int grpc_mdstr_hash_table_unref(grpc_mdstr_hash_table* table); - -/** Returns the number of entries in \a table. */ -size_t grpc_mdstr_hash_table_num_entries(const grpc_mdstr_hash_table* table); +void grpc_mdstr_hash_table_unref(grpc_mdstr_hash_table* table); /** Returns the value from \a table associated with \a key. Returns NULL if \a key is not found. */ void* grpc_mdstr_hash_table_get(const grpc_mdstr_hash_table* table, const grpc_mdstr* key); -/** Compares two hash tables. - The sort order is stable but undefined. */ -int grpc_mdstr_hash_table_cmp(const grpc_mdstr_hash_table* table1, - const grpc_mdstr_hash_table* table2); - -/** Iterates over the entries in \a table, calling \a func for each entry. */ -void grpc_mdstr_hash_table_iterate( - const grpc_mdstr_hash_table* table, - void (*func)(const grpc_mdstr_hash_table_entry* entry, void* user_data), - void* user_data); - #endif /* GRPC_CORE_LIB_TRANSPORT_MDSTR_HASH_TABLE_H */ diff --git a/src/core/lib/transport/metadata.c b/src/core/lib/transport/metadata.c index 977b34ca86..fac19b91d9 100644 --- a/src/core/lib/transport/metadata.c +++ b/src/core/lib/transport/metadata.c @@ -728,8 +728,8 @@ void *grpc_mdelem_get_user_data(grpc_mdelem *md, void (*destroy_func)(void *)) { return result; } -void grpc_mdelem_set_user_data(grpc_mdelem *md, void (*destroy_func)(void *), - void *user_data) { +void *grpc_mdelem_set_user_data(grpc_mdelem *md, void (*destroy_func)(void *), + void *user_data) { internal_metadata *im = (internal_metadata *)md; GPR_ASSERT(!is_mdelem_static(md)); GPR_ASSERT((user_data == NULL) == (destroy_func == NULL)); @@ -740,11 +740,12 @@ void grpc_mdelem_set_user_data(grpc_mdelem *md, void (*destroy_func)(void *), if (destroy_func != NULL) { destroy_func(user_data); } - return; + return (void *)gpr_atm_no_barrier_load(&im->user_data); } gpr_atm_no_barrier_store(&im->user_data, (gpr_atm)user_data); gpr_atm_rel_store(&im->destroy_user_data, (gpr_atm)destroy_func); gpr_mu_unlock(&im->mu_user_data); + return user_data; } grpc_slice grpc_mdstr_as_base64_encoded_and_huffman_compressed(grpc_mdstr *gs) { diff --git a/src/core/lib/transport/metadata.h b/src/core/lib/transport/metadata.h index 8dcfbb98bb..a4955a1ea7 100644 --- a/src/core/lib/transport/metadata.h +++ b/src/core/lib/transport/metadata.h @@ -120,8 +120,8 @@ size_t grpc_mdelem_get_size_in_hpack_table(grpc_mdelem *elem); is used as a type tag and is checked during user_data fetch. */ void *grpc_mdelem_get_user_data(grpc_mdelem *md, void (*if_destroy_func)(void *)); -void grpc_mdelem_set_user_data(grpc_mdelem *md, void (*destroy_func)(void *), - void *user_data); +void *grpc_mdelem_set_user_data(grpc_mdelem *md, void (*destroy_func)(void *), + void *user_data); /* Reference counting */ //#define GRPC_METADATA_REFCOUNT_DEBUG diff --git a/src/core/lib/transport/method_config.c b/src/core/lib/transport/method_config.c deleted file mode 100644 index 57d97700bf..0000000000 --- a/src/core/lib/transport/method_config.c +++ /dev/null @@ -1,340 +0,0 @@ -// -// Copyright 2015, Google Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -// - -#include "src/core/lib/transport/method_config.h" - -#include <string.h> - -#include <grpc/impl/codegen/grpc_types.h> -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <grpc/support/time.h> - -#include "src/core/lib/transport/mdstr_hash_table.h" -#include "src/core/lib/transport/metadata.h" - -// -// grpc_method_config -// - -// bool vtable - -static void* bool_copy(void* valuep) { - bool value = *(bool*)valuep; - bool* new_value = gpr_malloc(sizeof(bool)); - *new_value = value; - return new_value; -} - -static int bool_cmp(void* v1, void* v2) { - bool b1 = *(bool*)v1; - bool b2 = *(bool*)v2; - if (!b1 && b2) return -1; - if (b1 && !b2) return 1; - return 0; -} - -static grpc_mdstr_hash_table_vtable bool_vtable = {gpr_free, bool_copy, - bool_cmp}; - -// timespec vtable - -static void* timespec_copy(void* valuep) { - gpr_timespec value = *(gpr_timespec*)valuep; - gpr_timespec* new_value = gpr_malloc(sizeof(gpr_timespec)); - *new_value = value; - return new_value; -} - -static int timespec_cmp(void* v1, void* v2) { - return gpr_time_cmp(*(gpr_timespec*)v1, *(gpr_timespec*)v2); -} - -static grpc_mdstr_hash_table_vtable timespec_vtable = {gpr_free, timespec_copy, - timespec_cmp}; - -// int32 vtable - -static void* int32_copy(void* valuep) { - int32_t value = *(int32_t*)valuep; - int32_t* new_value = gpr_malloc(sizeof(int32_t)); - *new_value = value; - return new_value; -} - -static int int32_cmp(void* v1, void* v2) { - int32_t i1 = *(int32_t*)v1; - int32_t i2 = *(int32_t*)v2; - if (i1 < i2) return -1; - if (i1 > i2) return 1; - return 0; -} - -static grpc_mdstr_hash_table_vtable int32_vtable = {gpr_free, int32_copy, - int32_cmp}; - -// Hash table keys. -#define GRPC_METHOD_CONFIG_WAIT_FOR_READY "grpc.wait_for_ready" // bool -#define GRPC_METHOD_CONFIG_TIMEOUT "grpc.timeout" // gpr_timespec -#define GRPC_METHOD_CONFIG_MAX_REQUEST_MESSAGE_BYTES \ - "grpc.max_request_message_bytes" // int32 -#define GRPC_METHOD_CONFIG_MAX_RESPONSE_MESSAGE_BYTES \ - "grpc.max_response_message_bytes" // int32 - -struct grpc_method_config { - grpc_mdstr_hash_table* table; - grpc_mdstr* wait_for_ready_key; - grpc_mdstr* timeout_key; - grpc_mdstr* max_request_message_bytes_key; - grpc_mdstr* max_response_message_bytes_key; -}; - -grpc_method_config* grpc_method_config_create( - bool* wait_for_ready, gpr_timespec* timeout, - int32_t* max_request_message_bytes, int32_t* max_response_message_bytes) { - grpc_method_config* method_config = gpr_malloc(sizeof(grpc_method_config)); - memset(method_config, 0, sizeof(grpc_method_config)); - method_config->wait_for_ready_key = - grpc_mdstr_from_string(GRPC_METHOD_CONFIG_WAIT_FOR_READY); - method_config->timeout_key = - grpc_mdstr_from_string(GRPC_METHOD_CONFIG_TIMEOUT); - method_config->max_request_message_bytes_key = - grpc_mdstr_from_string(GRPC_METHOD_CONFIG_MAX_REQUEST_MESSAGE_BYTES); - method_config->max_response_message_bytes_key = - grpc_mdstr_from_string(GRPC_METHOD_CONFIG_MAX_RESPONSE_MESSAGE_BYTES); - grpc_mdstr_hash_table_entry entries[4]; - size_t num_entries = 0; - if (wait_for_ready != NULL) { - entries[num_entries].key = method_config->wait_for_ready_key; - entries[num_entries].value = wait_for_ready; - entries[num_entries].vtable = &bool_vtable; - ++num_entries; - } - if (timeout != NULL) { - entries[num_entries].key = method_config->timeout_key; - entries[num_entries].value = timeout; - entries[num_entries].vtable = ×pec_vtable; - ++num_entries; - } - if (max_request_message_bytes != NULL) { - entries[num_entries].key = method_config->max_request_message_bytes_key; - entries[num_entries].value = max_request_message_bytes; - entries[num_entries].vtable = &int32_vtable; - ++num_entries; - } - if (max_response_message_bytes != NULL) { - entries[num_entries].key = method_config->max_response_message_bytes_key; - entries[num_entries].value = max_response_message_bytes; - entries[num_entries].vtable = &int32_vtable; - ++num_entries; - } - method_config->table = grpc_mdstr_hash_table_create(num_entries, entries); - return method_config; -} - -grpc_method_config* grpc_method_config_ref(grpc_method_config* method_config) { - grpc_mdstr_hash_table_ref(method_config->table); - return method_config; -} - -void grpc_method_config_unref(grpc_method_config* method_config) { - if (grpc_mdstr_hash_table_unref(method_config->table)) { - GRPC_MDSTR_UNREF(method_config->wait_for_ready_key); - GRPC_MDSTR_UNREF(method_config->timeout_key); - GRPC_MDSTR_UNREF(method_config->max_request_message_bytes_key); - GRPC_MDSTR_UNREF(method_config->max_response_message_bytes_key); - gpr_free(method_config); - } -} - -int grpc_method_config_cmp(const grpc_method_config* method_config1, - const grpc_method_config* method_config2) { - return grpc_mdstr_hash_table_cmp(method_config1->table, - method_config2->table); -} - -const bool* grpc_method_config_get_wait_for_ready( - const grpc_method_config* method_config) { - return grpc_mdstr_hash_table_get(method_config->table, - method_config->wait_for_ready_key); -} - -const gpr_timespec* grpc_method_config_get_timeout( - const grpc_method_config* method_config) { - return grpc_mdstr_hash_table_get(method_config->table, - method_config->timeout_key); -} - -const int32_t* grpc_method_config_get_max_request_message_bytes( - const grpc_method_config* method_config) { - return grpc_mdstr_hash_table_get( - method_config->table, method_config->max_request_message_bytes_key); -} - -const int32_t* grpc_method_config_get_max_response_message_bytes( - const grpc_method_config* method_config) { - return grpc_mdstr_hash_table_get( - method_config->table, method_config->max_response_message_bytes_key); -} - -// -// grpc_method_config_table -// - -static void method_config_unref(void* valuep) { - grpc_method_config_unref(valuep); -} - -static void* method_config_ref(void* valuep) { - return grpc_method_config_ref(valuep); -} - -static int method_config_cmp(void* valuep1, void* valuep2) { - return grpc_method_config_cmp(valuep1, valuep2); -} - -static const grpc_mdstr_hash_table_vtable method_config_table_vtable = { - method_config_unref, method_config_ref, method_config_cmp}; - -grpc_method_config_table* grpc_method_config_table_create( - size_t num_entries, grpc_method_config_table_entry* entries) { - grpc_mdstr_hash_table_entry* hash_table_entries = - gpr_malloc(sizeof(grpc_mdstr_hash_table_entry) * num_entries); - for (size_t i = 0; i < num_entries; ++i) { - hash_table_entries[i].key = entries[i].method_name; - hash_table_entries[i].value = entries[i].method_config; - hash_table_entries[i].vtable = &method_config_table_vtable; - } - grpc_method_config_table* method_config_table = - grpc_mdstr_hash_table_create(num_entries, hash_table_entries); - gpr_free(hash_table_entries); - return method_config_table; -} - -grpc_method_config_table* grpc_method_config_table_ref( - grpc_method_config_table* table) { - return grpc_mdstr_hash_table_ref(table); -} - -void grpc_method_config_table_unref(grpc_method_config_table* table) { - grpc_mdstr_hash_table_unref(table); -} - -int grpc_method_config_table_cmp(const grpc_method_config_table* table1, - const grpc_method_config_table* table2) { - return grpc_mdstr_hash_table_cmp(table1, table2); -} - -void* grpc_method_config_table_get(const grpc_mdstr_hash_table* table, - const grpc_mdstr* path) { - void* value = grpc_mdstr_hash_table_get(table, path); - // If we didn't find a match for the path, try looking for a wildcard - // entry (i.e., change "/service/method" to "/service/*"). - if (value == NULL) { - const char* path_str = grpc_mdstr_as_c_string(path); - const char* sep = strrchr(path_str, '/') + 1; - const size_t len = (size_t)(sep - path_str); - char* buf = gpr_malloc(len + 2); // '*' and NUL - memcpy(buf, path_str, len); - buf[len] = '*'; - buf[len + 1] = '\0'; - grpc_mdstr* wildcard_path = grpc_mdstr_from_string(buf); - gpr_free(buf); - value = grpc_mdstr_hash_table_get(table, wildcard_path); - GRPC_MDSTR_UNREF(wildcard_path); - } - return value; -} - -static void* copy_arg(void* p) { return grpc_method_config_table_ref(p); } - -static void destroy_arg(void* p) { grpc_method_config_table_unref(p); } - -static int cmp_arg(void* p1, void* p2) { - return grpc_method_config_table_cmp(p1, p2); -} - -static grpc_arg_pointer_vtable arg_vtable = {copy_arg, destroy_arg, cmp_arg}; - -grpc_arg grpc_method_config_table_create_channel_arg( - grpc_method_config_table* table) { - grpc_arg arg; - arg.type = GRPC_ARG_POINTER; - arg.key = GRPC_ARG_SERVICE_CONFIG; - arg.value.pointer.p = table; - arg.value.pointer.vtable = &arg_vtable; - return arg; -} - -// State used by convert_entry() below. -typedef struct conversion_state { - void* (*convert_value)(const grpc_method_config* method_config); - const grpc_mdstr_hash_table_vtable* vtable; - size_t num_entries; - grpc_mdstr_hash_table_entry* entries; -} conversion_state; - -// A function to be passed to grpc_mdstr_hash_table_iterate() to create -// a copy of the entries. -static void convert_entry(const grpc_mdstr_hash_table_entry* entry, - void* user_data) { - conversion_state* state = user_data; - state->entries[state->num_entries].key = GRPC_MDSTR_REF(entry->key); - state->entries[state->num_entries].value = state->convert_value(entry->value); - state->entries[state->num_entries].vtable = state->vtable; - ++state->num_entries; -} - -grpc_mdstr_hash_table* grpc_method_config_table_convert( - const grpc_method_config_table* table, - void* (*convert_value)(const grpc_method_config* method_config), - const grpc_mdstr_hash_table_vtable* vtable) { - // Create an array of the entries in the table with converted values. - conversion_state state; - state.convert_value = convert_value; - state.vtable = vtable; - state.num_entries = 0; - state.entries = gpr_malloc(sizeof(grpc_mdstr_hash_table_entry) * - grpc_mdstr_hash_table_num_entries(table)); - grpc_mdstr_hash_table_iterate(table, convert_entry, &state); - // Create a new table based on the array we just constructed. - grpc_mdstr_hash_table* new_table = - grpc_mdstr_hash_table_create(state.num_entries, state.entries); - // Clean up the array. - for (size_t i = 0; i < state.num_entries; ++i) { - GRPC_MDSTR_UNREF(state.entries[i].key); - vtable->destroy_value(state.entries[i].value); - } - gpr_free(state.entries); - // Return the new table. - return new_table; -} diff --git a/src/core/lib/transport/method_config.h b/src/core/lib/transport/method_config.h deleted file mode 100644 index 58fedd9436..0000000000 --- a/src/core/lib/transport/method_config.h +++ /dev/null @@ -1,136 +0,0 @@ -// -// Copyright 2016, Google Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -// - -#ifndef GRPC_CORE_LIB_TRANSPORT_METHOD_CONFIG_H -#define GRPC_CORE_LIB_TRANSPORT_METHOD_CONFIG_H - -#include <stdbool.h> - -#include <grpc/impl/codegen/gpr_types.h> -#include <grpc/impl/codegen/grpc_types.h> - -#include "src/core/lib/transport/mdstr_hash_table.h" -#include "src/core/lib/transport/metadata.h" - -/// Per-method configuration. -typedef struct grpc_method_config grpc_method_config; - -/// Creates a grpc_method_config with the specified parameters. -/// Any parameter may be NULL to indicate that the value is unset. -/// -/// \a wait_for_ready indicates whether the client should wait until the -/// request deadline for the channel to become ready, even if there is a -/// temporary failure before the deadline while attempting to connect. -/// -/// \a timeout indicates the timeout for calls. -/// -/// \a max_request_message_bytes and \a max_response_message_bytes -/// indicate the maximum sizes of the request (checked when sending) and -/// response (checked when receiving) messages. -grpc_method_config* grpc_method_config_create( - bool* wait_for_ready, gpr_timespec* timeout, - int32_t* max_request_message_bytes, int32_t* max_response_message_bytes); - -grpc_method_config* grpc_method_config_ref(grpc_method_config* method_config); -void grpc_method_config_unref(grpc_method_config* method_config); - -/// Compares two grpc_method_configs. -/// The sort order is stable but undefined. -int grpc_method_config_cmp(const grpc_method_config* method_config1, - const grpc_method_config* method_config2); - -/// These methods return NULL if the requested field is unset. -/// The caller does NOT take ownership of the result. -const bool* grpc_method_config_get_wait_for_ready( - const grpc_method_config* method_config); -const gpr_timespec* grpc_method_config_get_timeout( - const grpc_method_config* method_config); -const int32_t* grpc_method_config_get_max_request_message_bytes( - const grpc_method_config* method_config); -const int32_t* grpc_method_config_get_max_response_message_bytes( - const grpc_method_config* method_config); - -/// A table of method configs. -typedef grpc_mdstr_hash_table grpc_method_config_table; - -typedef struct grpc_method_config_table_entry { - /// The name is of one of the following forms: - /// service/method -- specifies exact service and method name - /// service/* -- matches all methods for the specified service - grpc_mdstr* method_name; - grpc_method_config* method_config; -} grpc_method_config_table_entry; - -/// Takes new references to all keys and values in \a entries. -grpc_method_config_table* grpc_method_config_table_create( - size_t num_entries, grpc_method_config_table_entry* entries); - -grpc_method_config_table* grpc_method_config_table_ref( - grpc_method_config_table* table); -void grpc_method_config_table_unref(grpc_method_config_table* table); - -/// Compares two grpc_method_config_tables. -/// The sort order is stable but undefined. -int grpc_method_config_table_cmp(const grpc_method_config_table* table1, - const grpc_method_config_table* table2); - -/// Gets the method config for the specified \a path, which should be of -/// the form "/service/method". -/// Returns NULL if the method has no config. -/// Caller does NOT own a reference to the result. -/// -/// Note: This returns a void* instead of a grpc_method_config* so that -/// it can also be used for tables constructed via -/// grpc_method_config_table_convert(). -void* grpc_method_config_table_get(const grpc_mdstr_hash_table* table, - const grpc_mdstr* path); - -/// Returns a channel arg containing \a table. -grpc_arg grpc_method_config_table_create_channel_arg( - grpc_method_config_table* table); - -/// Generates a new table from \a table whose values are converted to a -/// new form via the \a convert_value function. The new table will use -/// \a vtable for its values. -/// -/// This is generally used to convert the table's value type from -/// grpc_method_config to a simple struct containing only the parameters -/// relevant to a particular filter, thus avoiding the need for a hash -/// table lookup on the fast path. In that scenario, \a convert_value -/// will return a new instance of the struct containing the values from -/// the grpc_method_config, and \a vtable provides the methods for -/// operating on the struct type. -grpc_mdstr_hash_table* grpc_method_config_table_convert( - const grpc_method_config_table* table, - void* (*convert_value)(const grpc_method_config* method_config), - const grpc_mdstr_hash_table_vtable* vtable); - -#endif /* GRPC_CORE_LIB_TRANSPORT_METHOD_CONFIG_H */ diff --git a/src/core/lib/transport/service_config.c b/src/core/lib/transport/service_config.c new file mode 100644 index 0000000000..2e2b59e3f7 --- /dev/null +++ b/src/core/lib/transport/service_config.c @@ -0,0 +1,249 @@ +// +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + +#include "src/core/lib/transport/service_config.h" + +#include <string.h> + +#include <grpc/impl/codegen/grpc_types.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> + +#include "src/core/lib/json/json.h" +#include "src/core/lib/support/string.h" +#include "src/core/lib/transport/mdstr_hash_table.h" + +// The main purpose of the code here is to parse the service config in +// JSON form, which will look like this: +// +// { +// "loadBalancingPolicy": "string", // optional +// "methodConfig": [ // array of one or more method_config objects +// { +// "name": [ // array of one or more name objects +// { +// "service": "string", // required +// "method": "string", // optional +// } +// ], +// // remaining fields are optional. +// // see https://developers.google.com/protocol-buffers/docs/proto3#json +// // for format details. +// "waitForReady": bool, +// "timeout": "duration_string", +// "maxRequestMessageBytes": "int64_string", +// "maxResponseMessageBytes": "int64_string", +// } +// ] +// } + +struct grpc_service_config { + char* json_string; // Underlying storage for json_tree. + grpc_json* json_tree; +}; + +grpc_service_config* grpc_service_config_create(const char* json_string) { + grpc_service_config* service_config = gpr_malloc(sizeof(*service_config)); + service_config->json_string = gpr_strdup(json_string); + service_config->json_tree = + grpc_json_parse_string(service_config->json_string); + if (service_config->json_tree == NULL) { + gpr_log(GPR_INFO, "failed to parse JSON for service config"); + gpr_free(service_config->json_string); + gpr_free(service_config); + return NULL; + } + return service_config; +} + +void grpc_service_config_destroy(grpc_service_config* service_config) { + grpc_json_destroy(service_config->json_tree); + gpr_free(service_config->json_string); + gpr_free(service_config); +} + +const char* grpc_service_config_get_lb_policy_name( + const grpc_service_config* service_config) { + const grpc_json* json = service_config->json_tree; + if (json->type != GRPC_JSON_OBJECT || json->key != NULL) return NULL; + const char* lb_policy_name = NULL; + for (grpc_json* field = json->child; field != NULL; field = field->next) { + if (field->key == NULL) return NULL; + if (strcmp(field->key, "loadBalancingPolicy") == 0) { + if (lb_policy_name != NULL) return NULL; // Duplicate. + if (field->type != GRPC_JSON_STRING) return NULL; + lb_policy_name = field->value; + } + } + return lb_policy_name; +} + +// Returns the number of names specified in the method config \a json. +static size_t count_names_in_method_config_json(grpc_json* json) { + size_t num_names = 0; + for (grpc_json* field = json->child; field != NULL; field = field->next) { + if (field->key != NULL && strcmp(field->key, "name") == 0) ++num_names; + } + return num_names; +} + +// Returns a path string for the JSON name object specified by \a json. +// Returns NULL on error. Caller takes ownership of result. +static char* parse_json_method_name(grpc_json* json) { + if (json->type != GRPC_JSON_OBJECT) return NULL; + const char* service_name = NULL; + const char* method_name = NULL; + for (grpc_json* child = json->child; child != NULL; child = child->next) { + if (child->key == NULL) return NULL; + if (child->type != GRPC_JSON_STRING) return NULL; + if (strcmp(child->key, "service") == 0) { + if (service_name != NULL) return NULL; // Duplicate. + if (child->value == NULL) return NULL; + service_name = child->value; + } else if (strcmp(child->key, "method") == 0) { + if (method_name != NULL) return NULL; // Duplicate. + if (child->value == NULL) return NULL; + method_name = child->value; + } + } + if (service_name == NULL) return NULL; // Required field. + char* path; + gpr_asprintf(&path, "/%s/%s", service_name, + method_name == NULL ? "*" : method_name); + return path; +} + +// Parses the method config from \a json. Adds an entry to \a entries for +// each name found, incrementing \a idx for each entry added. +// Returns false on error. +static bool parse_json_method_config( + grpc_json* json, void* (*create_value)(const grpc_json* method_config_json), + const grpc_mdstr_hash_table_vtable* vtable, + grpc_mdstr_hash_table_entry* entries, size_t* idx) { + // Construct value. + void* method_config = create_value(json); + if (method_config == NULL) return false; + // Construct list of paths. + bool success = false; + gpr_strvec paths; + gpr_strvec_init(&paths); + for (grpc_json* child = json->child; child != NULL; child = child->next) { + if (child->key == NULL) continue; + if (strcmp(child->key, "name") == 0) { + if (child->type != GRPC_JSON_ARRAY) goto done; + for (grpc_json* name = child->child; name != NULL; name = name->next) { + char* path = parse_json_method_name(name); + gpr_strvec_add(&paths, path); + } + } + } + if (paths.count == 0) goto done; // No names specified. + // Add entry for each path. + for (size_t i = 0; i < paths.count; ++i) { + entries[*idx].key = grpc_mdstr_from_string(paths.strs[i]); + entries[*idx].value = vtable->copy_value(method_config); + entries[*idx].vtable = vtable; + ++*idx; + } + success = true; +done: + vtable->destroy_value(method_config); + gpr_strvec_destroy(&paths); + return success; +} + +grpc_mdstr_hash_table* grpc_service_config_create_method_config_table( + const grpc_service_config* service_config, + void* (*create_value)(const grpc_json* method_config_json), + const grpc_mdstr_hash_table_vtable* vtable) { + const grpc_json* json = service_config->json_tree; + // Traverse parsed JSON tree. + if (json->type != GRPC_JSON_OBJECT || json->key != NULL) return NULL; + size_t num_entries = 0; + grpc_mdstr_hash_table_entry* entries = NULL; + for (grpc_json* field = json->child; field != NULL; field = field->next) { + if (field->key == NULL) return NULL; + if (strcmp(field->key, "methodConfig") == 0) { + if (entries != NULL) return NULL; // Duplicate. + if (field->type != GRPC_JSON_ARRAY) return NULL; + // Find number of entries. + for (grpc_json* method = field->child; method != NULL; + method = method->next) { + num_entries += count_names_in_method_config_json(method); + } + // Populate method config table entries. + entries = gpr_malloc(num_entries * sizeof(grpc_mdstr_hash_table_entry)); + size_t idx = 0; + for (grpc_json* method = field->child; method != NULL; + method = method->next) { + if (!parse_json_method_config(method, create_value, vtable, entries, + &idx)) { + return NULL; + } + } + GPR_ASSERT(idx == num_entries); + } + } + // Instantiate method config table. + grpc_mdstr_hash_table* method_config_table = NULL; + if (entries != NULL) { + method_config_table = grpc_mdstr_hash_table_create(num_entries, entries); + // Clean up. + for (size_t i = 0; i < num_entries; ++i) { + GRPC_MDSTR_UNREF(entries[i].key); + vtable->destroy_value(entries[i].value); + } + gpr_free(entries); + } + return method_config_table; +} + +void* grpc_method_config_table_get(const grpc_mdstr_hash_table* table, + const grpc_mdstr* path) { + void* value = grpc_mdstr_hash_table_get(table, path); + // If we didn't find a match for the path, try looking for a wildcard + // entry (i.e., change "/service/method" to "/service/*"). + if (value == NULL) { + const char* path_str = grpc_mdstr_as_c_string(path); + const char* sep = strrchr(path_str, '/') + 1; + const size_t len = (size_t)(sep - path_str); + char* buf = gpr_malloc(len + 2); // '*' and NUL + memcpy(buf, path_str, len); + buf[len] = '*'; + buf[len + 1] = '\0'; + grpc_mdstr* wildcard_path = grpc_mdstr_from_string(buf); + gpr_free(buf); + value = grpc_mdstr_hash_table_get(table, wildcard_path); + GRPC_MDSTR_UNREF(wildcard_path); + } + return value; +} diff --git a/src/core/lib/transport/service_config.h b/src/core/lib/transport/service_config.h new file mode 100644 index 0000000000..2ffe475193 --- /dev/null +++ b/src/core/lib/transport/service_config.h @@ -0,0 +1,70 @@ +// +// Copyright 2016, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + +#ifndef GRPC_CORE_LIB_TRANSPORT_SERVICE_CONFIG_H +#define GRPC_CORE_LIB_TRANSPORT_SERVICE_CONFIG_H + +#include <grpc/impl/codegen/grpc_types.h> + +#include "src/core/lib/json/json.h" +#include "src/core/lib/transport/mdstr_hash_table.h" + +typedef struct grpc_service_config grpc_service_config; + +grpc_service_config* grpc_service_config_create(const char* json_string); +void grpc_service_config_destroy(grpc_service_config* service_config); + +/// Gets the LB policy name from \a service_config. +/// Returns NULL if no LB policy name was specified. +/// Caller does NOT take ownership. +const char* grpc_service_config_get_lb_policy_name( + const grpc_service_config* service_config); + +/// Creates a method config table based on the data in \a json. +/// The table's keys are request paths. The table's value type is +/// returned by \a create_value(), based on data parsed from the JSON tree. +/// \a vtable provides methods used to manage the values. +/// Returns NULL on error. +grpc_mdstr_hash_table* grpc_service_config_create_method_config_table( + const grpc_service_config* service_config, + void* (*create_value)(const grpc_json* method_config_json), + const grpc_mdstr_hash_table_vtable* vtable); + +/// A helper function for looking up values in the table returned by +/// \a grpc_service_config_create_method_config_table(). +/// Gets the method config for the specified \a path, which should be of +/// the form "/service/method". +/// Returns NULL if the method has no config. +/// Caller does NOT own a reference to the result. +void* grpc_method_config_table_get(const grpc_mdstr_hash_table* table, + const grpc_mdstr* path); + +#endif /* GRPC_CORE_LIB_TRANSPORT_SERVICE_CONFIG_H */ diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 866cd9ea87..b448126da8 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -160,6 +160,11 @@ char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, return transport->vtable->get_peer(exec_ctx, transport); } +grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx, + grpc_transport *transport) { + return transport->vtable->get_endpoint(exec_ctx, transport); +} + void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op, grpc_error *error) { diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 8916b28b72..96c26749c8 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -37,6 +37,7 @@ #include <stddef.h> #include "src/core/lib/channel/context.h" +#include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_set.h" @@ -295,6 +296,10 @@ void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, grpc_transport *transport); char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *transport); +/* Get the endpoint used by \a transport */ +grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx, + grpc_transport *transport); + /* Allocate a grpc_transport_op, and preconfigure the on_consumed closure to \a on_consumed and then delete the returned transport op */ grpc_transport_op *grpc_make_transport_op(grpc_closure *on_consumed); diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h index fc7140671b..8553148c35 100644 --- a/src/core/lib/transport/transport_impl.h +++ b/src/core/lib/transport/transport_impl.h @@ -74,6 +74,9 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_get_peer */ char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_transport *self); + + /* implementation of grpc_transport_get_endpoint */ + grpc_endpoint *(*get_endpoint)(grpc_exec_ctx *exec_ctx, grpc_transport *self); } grpc_transport_vtable; /* an instance of a grpc transport */ |