diff options
author | Mark D. Roth <roth@google.com> | 2016-12-05 07:03:49 -0800 |
---|---|---|
committer | Mark D. Roth <roth@google.com> | 2016-12-05 07:03:49 -0800 |
commit | 16eaa454154122a5db61cd8058794af6c5fe9077 (patch) | |
tree | bccded87d75d5fdadf2a01a6498f0b7c0cf92650 /src/core | |
parent | 977f5d4e7dc8bdf21bda2e8b9a7a232f88de8e73 (diff) | |
parent | 8e9a492ad910ad66751fc1d0015e4ef051b62ddc (diff) |
Merge remote-tracking branch 'upstream/master' into client_channel_init_cleanup
Diffstat (limited to 'src/core')
28 files changed, 1873 insertions, 1641 deletions
diff --git a/src/core/ext/client_channel/http_connect_handshaker.c b/src/core/ext/client_channel/http_connect_handshaker.c index 82042897b2..572af52dfd 100644 --- a/src/core/ext/client_channel/http_connect_handshaker.c +++ b/src/core/ext/client_channel/http_connect_handshaker.c @@ -41,9 +41,9 @@ #include <grpc/support/string_util.h> #include "src/core/ext/client_channel/uri_parser.h" +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/http/format_request.h" #include "src/core/lib/http/parser.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/support/env.h" typedef struct http_connect_handshaker { @@ -53,27 +53,38 @@ typedef struct http_connect_handshaker { char* proxy_server; char* server_name; + gpr_refcount refcount; + gpr_mu mu; + + bool shutdown; + // Endpoint and read buffer to destroy after a shutdown. + grpc_endpoint* endpoint_to_destroy; + grpc_slice_buffer* read_buffer_to_destroy; + // State saved while performing the handshake. - grpc_endpoint* endpoint; - grpc_channel_args* args; - grpc_handshaker_done_cb cb; - void* user_data; + grpc_handshaker_args* args; + grpc_closure* on_handshake_done; // Objects for processing the HTTP CONNECT request and response. grpc_slice_buffer write_buffer; - grpc_slice_buffer* read_buffer; // Ownership passes through this object. grpc_closure request_done_closure; grpc_closure response_read_closure; grpc_http_parser http_parser; grpc_http_response http_response; - grpc_timer timeout_timer; - - gpr_refcount refcount; } http_connect_handshaker; // Unref and clean up handshaker. -static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) { +static void http_connect_handshaker_unref(grpc_exec_ctx* exec_ctx, + http_connect_handshaker* handshaker) { if (gpr_unref(&handshaker->refcount)) { + gpr_mu_destroy(&handshaker->mu); + if (handshaker->endpoint_to_destroy != NULL) { + grpc_endpoint_destroy(exec_ctx, handshaker->endpoint_to_destroy); + } + if (handshaker->read_buffer_to_destroy != NULL) { + grpc_slice_buffer_destroy(handshaker->read_buffer_to_destroy); + gpr_free(handshaker->read_buffer_to_destroy); + } gpr_free(handshaker->proxy_server); gpr_free(handshaker->server_name); grpc_slice_buffer_destroy(&handshaker->write_buffer); @@ -83,28 +94,64 @@ static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) { } } -// Callback invoked when deadline is exceeded. -static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - http_connect_handshaker* handshaker = arg; - if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled. - grpc_endpoint_shutdown(exec_ctx, handshaker->endpoint); +// Set args fields to NULL, saving the endpoint and read buffer for +// later destruction. +static void cleanup_args_for_failure_locked( + http_connect_handshaker* handshaker) { + handshaker->endpoint_to_destroy = handshaker->args->endpoint; + handshaker->args->endpoint = NULL; + handshaker->read_buffer_to_destroy = handshaker->args->read_buffer; + handshaker->args->read_buffer = NULL; + grpc_channel_args_destroy(handshaker->args->args); + handshaker->args->args = NULL; +} + +// If the handshake failed or we're shutting down, clean up and invoke the +// callback with the error. +static void handshake_failed_locked(grpc_exec_ctx* exec_ctx, + http_connect_handshaker* handshaker, + grpc_error* error) { + if (error == GRPC_ERROR_NONE) { + // If we were shut down after an endpoint operation succeeded but + // before the endpoint callback was invoked, we need to generate our + // own error. + error = GRPC_ERROR_CREATE("Handshaker shutdown"); } - http_connect_handshaker_unref(handshaker); + if (!handshaker->shutdown) { + // TODO(ctiller): It is currently necessary to shutdown endpoints + // before destroying them, even if we know that there are no + // pending read/write callbacks. This should be fixed, at which + // point this can be removed. + grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint); + // Not shutting down, so the handshake failed. Clean up before + // invoking the callback. + cleanup_args_for_failure_locked(handshaker); + // Set shutdown to true so that subsequent calls to + // http_connect_handshaker_shutdown() do nothing. + handshaker->shutdown = true; + } + // Invoke callback. + grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL); } // Callback invoked when finished writing HTTP CONNECT request. static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { http_connect_handshaker* handshaker = arg; - if (error != GRPC_ERROR_NONE) { - // If the write failed, invoke the callback immediately with the error. - handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args, - handshaker->read_buffer, handshaker->user_data, - GRPC_ERROR_REF(error)); + gpr_mu_lock(&handshaker->mu); + if (error != GRPC_ERROR_NONE || handshaker->shutdown) { + // If the write failed or we're shutting down, clean up and invoke the + // callback with the error. + handshake_failed_locked(exec_ctx, handshaker, GRPC_ERROR_REF(error)); + gpr_mu_unlock(&handshaker->mu); + http_connect_handshaker_unref(exec_ctx, handshaker); } else { // Otherwise, read the response. - grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer, + // The read callback inherits our ref to the handshaker. + grpc_endpoint_read(exec_ctx, handshaker->args->endpoint, + handshaker->args->read_buffer, &handshaker->response_read_closure); + gpr_mu_unlock(&handshaker->mu); } } @@ -112,36 +159,40 @@ static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg, static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { http_connect_handshaker* handshaker = arg; - if (error != GRPC_ERROR_NONE) { - GRPC_ERROR_REF(error); // Take ref to pass to the handshake-done callback. + gpr_mu_lock(&handshaker->mu); + if (error != GRPC_ERROR_NONE || handshaker->shutdown) { + // If the read failed or we're shutting down, clean up and invoke the + // callback with the error. + handshake_failed_locked(exec_ctx, handshaker, GRPC_ERROR_REF(error)); goto done; } // Add buffer to parser. - for (size_t i = 0; i < handshaker->read_buffer->count; ++i) { - if (GRPC_SLICE_LENGTH(handshaker->read_buffer->slices[i]) > 0) { + for (size_t i = 0; i < handshaker->args->read_buffer->count; ++i) { + if (GRPC_SLICE_LENGTH(handshaker->args->read_buffer->slices[i]) > 0) { size_t body_start_offset = 0; error = grpc_http_parser_parse(&handshaker->http_parser, - handshaker->read_buffer->slices[i], + handshaker->args->read_buffer->slices[i], &body_start_offset); - if (error != GRPC_ERROR_NONE) goto done; + if (error != GRPC_ERROR_NONE) { + handshake_failed_locked(exec_ctx, handshaker, error); + goto done; + } if (handshaker->http_parser.state == GRPC_HTTP_BODY) { - // We've gotten back a successul response, so stop the timeout timer. - grpc_timer_cancel(exec_ctx, &handshaker->timeout_timer); // Remove the data we've already read from the read buffer, // leaving only the leftover bytes (if any). grpc_slice_buffer tmp_buffer; grpc_slice_buffer_init(&tmp_buffer); if (body_start_offset < - GRPC_SLICE_LENGTH(handshaker->read_buffer->slices[i])) { + GRPC_SLICE_LENGTH(handshaker->args->read_buffer->slices[i])) { grpc_slice_buffer_add( &tmp_buffer, - grpc_slice_split_tail(&handshaker->read_buffer->slices[i], + grpc_slice_split_tail(&handshaker->args->read_buffer->slices[i], body_start_offset)); } grpc_slice_buffer_addn(&tmp_buffer, - &handshaker->read_buffer->slices[i + 1], - handshaker->read_buffer->count - i - 1); - grpc_slice_buffer_swap(handshaker->read_buffer, &tmp_buffer); + &handshaker->args->read_buffer->slices[i + 1], + handshaker->args->read_buffer->count - i - 1); + grpc_slice_buffer_swap(handshaker->args->read_buffer, &tmp_buffer); grpc_slice_buffer_destroy(&tmp_buffer); break; } @@ -159,9 +210,11 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, // complete (e.g., handling chunked transfer encoding or looking // at the Content-Length: header). if (handshaker->http_parser.state != GRPC_HTTP_BODY) { - grpc_slice_buffer_reset_and_unref(handshaker->read_buffer); - grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer, + grpc_slice_buffer_reset_and_unref(handshaker->args->read_buffer); + grpc_endpoint_read(exec_ctx, handshaker->args->endpoint, + handshaker->args->read_buffer, &handshaker->response_read_closure); + gpr_mu_unlock(&handshaker->mu); return; } // Make sure we got a 2xx response. @@ -172,11 +225,17 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, handshaker->http_response.status); error = GRPC_ERROR_CREATE(msg); gpr_free(msg); + handshake_failed_locked(exec_ctx, handshaker, error); + goto done; } + // Success. Invoke handshake-done callback. + grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL); done: - // Invoke handshake-done callback. - handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args, - handshaker->read_buffer, handshaker->user_data, error); + // Set shutdown to true so that subsequent calls to + // http_connect_handshaker_shutdown() do nothing. + handshaker->shutdown = true; + gpr_mu_unlock(&handshaker->mu); + http_connect_handshaker_unref(exec_ctx, handshaker); } // @@ -186,25 +245,30 @@ done: static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker_in) { http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in; - http_connect_handshaker_unref(handshaker); + http_connect_handshaker_unref(exec_ctx, handshaker); } static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx, - grpc_handshaker* handshaker) {} + grpc_handshaker* handshaker_in) { + http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in; + gpr_mu_lock(&handshaker->mu); + if (!handshaker->shutdown) { + handshaker->shutdown = true; + grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint); + cleanup_args_for_failure_locked(handshaker); + } + gpr_mu_unlock(&handshaker->mu); +} static void http_connect_handshaker_do_handshake( grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker_in, - grpc_endpoint* endpoint, grpc_channel_args* args, - grpc_slice_buffer* read_buffer, gpr_timespec deadline, - grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb, - void* user_data) { + grpc_tcp_server_acceptor* acceptor, grpc_closure* on_handshake_done, + grpc_handshaker_args* args) { http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in; + gpr_mu_lock(&handshaker->mu); // Save state in the handshaker object. - handshaker->endpoint = endpoint; handshaker->args = args; - handshaker->cb = cb; - handshaker->user_data = user_data; - handshaker->read_buffer = read_buffer; + handshaker->on_handshake_done = on_handshake_done; // Send HTTP CONNECT request. gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s", handshaker->server_name, handshaker->proxy_server); @@ -216,16 +280,14 @@ static void http_connect_handshaker_do_handshake( request.handshaker = &grpc_httpcli_plaintext; grpc_slice request_slice = grpc_httpcli_format_connect_request(&request); grpc_slice_buffer_add(&handshaker->write_buffer, request_slice); - grpc_endpoint_write(exec_ctx, endpoint, &handshaker->write_buffer, - &handshaker->request_done_closure); - // Set timeout timer. The timer gets a reference to the handshaker. + // Take a new ref to be held by the write callback. gpr_ref(&handshaker->refcount); - grpc_timer_init(exec_ctx, &handshaker->timeout_timer, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - on_timeout, handshaker, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_endpoint_write(exec_ctx, args->endpoint, &handshaker->write_buffer, + &handshaker->request_done_closure); + gpr_mu_unlock(&handshaker->mu); } -static const struct grpc_handshaker_vtable http_connect_handshaker_vtable = { +static const grpc_handshaker_vtable http_connect_handshaker_vtable = { http_connect_handshaker_destroy, http_connect_handshaker_shutdown, http_connect_handshaker_do_handshake}; @@ -233,10 +295,11 @@ grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server, const char* server_name) { GPR_ASSERT(proxy_server != NULL); GPR_ASSERT(server_name != NULL); - http_connect_handshaker* handshaker = - gpr_malloc(sizeof(http_connect_handshaker)); + http_connect_handshaker* handshaker = gpr_malloc(sizeof(*handshaker)); memset(handshaker, 0, sizeof(*handshaker)); grpc_handshaker_init(&http_connect_handshaker_vtable, &handshaker->base); + gpr_mu_init(&handshaker->mu); + gpr_ref_init(&handshaker->refcount, 1); handshaker->proxy_server = gpr_strdup(proxy_server); handshaker->server_name = gpr_strdup(server_name); grpc_slice_buffer_init(&handshaker->write_buffer); @@ -246,7 +309,6 @@ grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server, handshaker); grpc_http_parser_init(&handshaker->http_parser, GRPC_HTTP_RESPONSE, &handshaker->http_response); - gpr_ref_init(&handshaker->refcount, 1); return &handshaker->base; } diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index 2ab07db30c..f294e69392 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -336,16 +336,18 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, int initial_backoff_ms = GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; + int min_backoff_ms = GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS * 1000; bool fixed_reconnect_backoff = false; if (c->args) { for (size_t i = 0; i < c->args->num_args; i++) { if (0 == strcmp(c->args->args[i].key, - "grpc.testing.fixed_reconnect_backoff")) { + "grpc.testing.fixed_reconnect_backoff_ms")) { GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER); fixed_reconnect_backoff = true; - initial_backoff_ms = max_backoff_ms = grpc_channel_arg_get_integer( - &c->args->args[i], - (grpc_integer_options){initial_backoff_ms, 100, INT_MAX}); + initial_backoff_ms = min_backoff_ms = max_backoff_ms = + grpc_channel_arg_get_integer( + &c->args->args[i], + (grpc_integer_options){initial_backoff_ms, 100, INT_MAX}); } else if (0 == strcmp(c->args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { fixed_reconnect_backoff = false; @@ -362,11 +364,11 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, } } gpr_backoff_init( - &c->backoff_state, + &c->backoff_state, initial_backoff_ms, fixed_reconnect_backoff ? 1.0 : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER, - initial_backoff_ms, max_backoff_ms); + min_backoff_ms, max_backoff_ms); gpr_mu_init(&c->mu); return grpc_subchannel_index_register(exec_ctx, key, c); @@ -643,9 +645,7 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx, GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con)); /* setup subchannel watching connected subchannel for changes; subchannel - ref - for connecting is donated - to the state watcher */ + ref for connecting is donated to the state watcher */ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); grpc_connected_subchannel_notify_on_state_change( diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 4262d2b9a4..df0db61c22 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -123,10 +123,11 @@ #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/static_metadata.h" -#define BACKOFF_MULTIPLIER 1.6 -#define BACKOFF_JITTER 0.2 -#define BACKOFF_MIN_SECONDS 10 -#define BACKOFF_MAX_SECONDS 60 +#define GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS 20 +#define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1 +#define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6 +#define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120 +#define GRPC_GRPCLB_RECONNECT_JITTER 0.2 int grpc_lb_glb_trace = 0; @@ -1107,9 +1108,12 @@ static void lb_call_init_locked(glb_lb_policy *glb_policy) { grpc_closure_init(&glb_policy->lb_on_response_received, lb_on_response_received, glb_policy); - gpr_backoff_init(&glb_policy->lb_call_backoff_state, BACKOFF_MULTIPLIER, - BACKOFF_JITTER, BACKOFF_MIN_SECONDS * 1000, - BACKOFF_MAX_SECONDS * 1000); + gpr_backoff_init(&glb_policy->lb_call_backoff_state, + GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS, + GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER, + GRPC_GRPCLB_RECONNECT_JITTER, + GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000, + GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); } static void lb_call_destroy_locked(glb_lb_policy *glb_policy) { diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index 665439f360..15476f5792 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -46,10 +46,11 @@ #include "src/core/lib/support/backoff.h" #include "src/core/lib/support/string.h" -#define BACKOFF_MULTIPLIER 1.6 -#define BACKOFF_JITTER 0.2 -#define BACKOFF_MIN_SECONDS 1 -#define BACKOFF_MAX_SECONDS 120 +#define GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS 1 +#define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1 +#define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6 +#define GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS 120 +#define GRPC_DNS_RECONNECT_JITTER 0.2 typedef struct { /** base class: must be first */ @@ -269,8 +270,11 @@ static grpc_resolver *dns_create(grpc_resolver_args *args, server_name_arg.value.string = (char *)path; r->channel_args = grpc_channel_args_copy_and_add(args->args, &server_name_arg, 1); - gpr_backoff_init(&r->backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER, - BACKOFF_MIN_SECONDS * 1000, BACKOFF_MAX_SECONDS * 1000); + gpr_backoff_init(&r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS, + GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, + GRPC_DNS_RECONNECT_JITTER, + GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000, + GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); return &r->base; } diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c new file mode 100644 index 0000000000..213395c20f --- /dev/null +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c @@ -0,0 +1,261 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/transport/chttp2/client/chttp2_connector.h" + +#include <grpc/grpc.h> + +#include <string.h> + +#include <grpc/slice_buffer.h> +#include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> + +#include "src/core/ext/client_channel/connector.h" +#include "src/core/ext/client_channel/http_connect_handshaker.h" +#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/iomgr/tcp_client.h" +#include "src/core/lib/security/transport/security_connector.h" + +typedef struct { + grpc_connector base; + + gpr_mu mu; + gpr_refcount refs; + + bool shutdown; + + char *server_name; + grpc_chttp2_create_handshakers_func create_handshakers; + void *create_handshakers_user_data; + + grpc_closure *notify; + grpc_connect_in_args args; + grpc_connect_out_args *result; + grpc_closure initial_string_sent; + grpc_slice_buffer initial_string_buffer; + + grpc_endpoint *endpoint; // Non-NULL until handshaking starts. + + grpc_closure connected; + + grpc_handshake_manager *handshake_mgr; +} chttp2_connector; + +static void chttp2_connector_ref(grpc_connector *con) { + chttp2_connector *c = (chttp2_connector *)con; + gpr_ref(&c->refs); +} + +static void chttp2_connector_unref(grpc_exec_ctx *exec_ctx, + grpc_connector *con) { + chttp2_connector *c = (chttp2_connector *)con; + if (gpr_unref(&c->refs)) { + /* c->initial_string_buffer does not need to be destroyed */ + gpr_mu_destroy(&c->mu); + // If handshaking is not yet in progress, destroy the endpoint. + // Otherwise, the handshaker will do this for us. + if (c->endpoint != NULL) grpc_endpoint_destroy(exec_ctx, c->endpoint); + gpr_free(c->server_name); + gpr_free(c); + } +} + +static void chttp2_connector_shutdown(grpc_exec_ctx *exec_ctx, + grpc_connector *con) { + chttp2_connector *c = (chttp2_connector *)con; + gpr_mu_lock(&c->mu); + c->shutdown = true; + if (c->handshake_mgr != NULL) { + grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr); + } + // If handshaking is not yet in progress, shutdown the endpoint. + // Otherwise, the handshaker will do this for us. + if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint); + gpr_mu_unlock(&c->mu); +} + +static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_handshaker_args *args = arg; + chttp2_connector *c = args->user_data; + gpr_mu_lock(&c->mu); + if (error != GRPC_ERROR_NONE || c->shutdown) { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE("connector shutdown"); + // We were shut down after handshaking completed successfully, so + // destroy the endpoint here. + // TODO(ctiller): It is currently necessary to shutdown endpoints + // before destroying them, even if we know that there are no + // pending read/write callbacks. This should be fixed, at which + // point this can be removed. + grpc_endpoint_shutdown(exec_ctx, args->endpoint); + grpc_endpoint_destroy(exec_ctx, args->endpoint); + grpc_channel_args_destroy(args->args); + grpc_slice_buffer_destroy(args->read_buffer); + gpr_free(args->read_buffer); + } else { + error = GRPC_ERROR_REF(error); + } + memset(c->result, 0, sizeof(*c->result)); + } else { + c->result->transport = + grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 1); + GPR_ASSERT(c->result->transport); + grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, + args->read_buffer); + c->result->channel_args = args->args; + } + grpc_closure *notify = c->notify; + c->notify = NULL; + grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); + grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); + c->handshake_mgr = NULL; + gpr_mu_unlock(&c->mu); + chttp2_connector_unref(exec_ctx, (grpc_connector *)c); +} + +static void start_handshake_locked(grpc_exec_ctx *exec_ctx, + chttp2_connector *c) { + c->handshake_mgr = grpc_handshake_manager_create(); + char *proxy_name = grpc_get_http_proxy_server(); + if (proxy_name != NULL) { + grpc_handshake_manager_add( + c->handshake_mgr, + grpc_http_connect_handshaker_create(proxy_name, c->server_name)); + gpr_free(proxy_name); + } + if (c->create_handshakers != NULL) { + c->create_handshakers(exec_ctx, c->create_handshakers_user_data, + c->handshake_mgr); + } + grpc_handshake_manager_do_handshake( + exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args, + c->args.deadline, NULL /* acceptor */, on_handshake_done, c); + c->endpoint = NULL; // Endpoint handed off to handshake manager. +} + +static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + chttp2_connector *c = arg; + gpr_mu_lock(&c->mu); + if (error != GRPC_ERROR_NONE || c->shutdown) { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE("connector shutdown"); + } else { + error = GRPC_ERROR_REF(error); + } + memset(c->result, 0, sizeof(*c->result)); + grpc_closure *notify = c->notify; + c->notify = NULL; + grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); + gpr_mu_unlock(&c->mu); + chttp2_connector_unref(exec_ctx, arg); + } else { + start_handshake_locked(exec_ctx, c); + gpr_mu_unlock(&c->mu); + } +} + +static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + chttp2_connector *c = arg; + gpr_mu_lock(&c->mu); + if (error != GRPC_ERROR_NONE || c->shutdown) { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE("connector shutdown"); + } else { + error = GRPC_ERROR_REF(error); + } + memset(c->result, 0, sizeof(*c->result)); + grpc_closure *notify = c->notify; + c->notify = NULL; + grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); + gpr_mu_unlock(&c->mu); + chttp2_connector_unref(exec_ctx, arg); + } else { + GPR_ASSERT(c->endpoint != NULL); + if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) { + grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent, + c); + grpc_slice_buffer_init(&c->initial_string_buffer); + grpc_slice_buffer_add(&c->initial_string_buffer, + c->args.initial_connect_string); + grpc_endpoint_write(exec_ctx, c->endpoint, &c->initial_string_buffer, + &c->initial_string_sent); + } else { + start_handshake_locked(exec_ctx, c); + } + gpr_mu_unlock(&c->mu); + } +} + +static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx, + grpc_connector *con, + const grpc_connect_in_args *args, + grpc_connect_out_args *result, + grpc_closure *notify) { + chttp2_connector *c = (chttp2_connector *)con; + gpr_mu_lock(&c->mu); + GPR_ASSERT(c->notify == NULL); + c->notify = notify; + c->args = *args; + c->result = result; + GPR_ASSERT(c->endpoint == NULL); + chttp2_connector_ref(con); // Ref taken for callback. + grpc_closure_init(&c->connected, connected, c); + grpc_tcp_client_connect(exec_ctx, &c->connected, &c->endpoint, + args->interested_parties, args->channel_args, + args->addr, args->deadline); + gpr_mu_unlock(&c->mu); +} + +static const grpc_connector_vtable chttp2_connector_vtable = { + chttp2_connector_ref, chttp2_connector_unref, chttp2_connector_shutdown, + chttp2_connector_connect}; + +grpc_connector *grpc_chttp2_connector_create( + grpc_exec_ctx *exec_ctx, const char *server_name, + grpc_chttp2_create_handshakers_func create_handshakers, + void *create_handshakers_user_data) { + chttp2_connector *c = gpr_malloc(sizeof(*c)); + memset(c, 0, sizeof(*c)); + c->base.vtable = &chttp2_connector_vtable; + gpr_mu_init(&c->mu); + gpr_ref_init(&c->refs, 1); + c->server_name = gpr_strdup(server_name); + c->create_handshakers = create_handshakers; + c->create_handshakers_user_data = create_handshakers_user_data; + return &c->base; +} diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.h b/src/core/ext/transport/chttp2/client/chttp2_connector.h new file mode 100644 index 0000000000..6c34ce1af1 --- /dev/null +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.h @@ -0,0 +1,52 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H + +#include "src/core/ext/client_channel/connector.h" +#include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/iomgr/exec_ctx.h" + +typedef void (*grpc_chttp2_create_handshakers_func)( + grpc_exec_ctx* exec_ctx, void* user_data, + grpc_handshake_manager* handshake_mgr); + +/// If \a create_handshakers is non-NULL, it will be called with +/// \a create_handshakers_user_data to add handshakers. +grpc_connector* grpc_chttp2_connector_create( + grpc_exec_ctx* exec_ctx, const char* server_name, + grpc_chttp2_create_handshakers_func create_handshakers, + void* create_handshakers_user_data); + +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H */ diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 9e0478feab..7325f9413d 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -33,137 +33,17 @@ #include <grpc/grpc.h> -#include <stdlib.h> #include <string.h> -#include <grpc/slice.h> -#include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> #include "src/core/ext/client_channel/client_channel.h" -#include "src/core/ext/client_channel/http_connect_handshaker.h" -#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/ext/transport/chttp2/client/chttp2_connector.h" #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/channel/compress_filter.h" -#include "src/core/lib/channel/handshaker.h" -#include "src/core/lib/channel/http_client_filter.h" -#include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/channel.h" -// -// connector -// - -typedef struct { - grpc_connector base; - gpr_refcount refs; - - grpc_closure *notify; - grpc_connect_in_args args; - grpc_connect_out_args *result; - grpc_closure initial_string_sent; - grpc_slice_buffer initial_string_buffer; - - grpc_endpoint *tcp; - - grpc_closure connected; - - grpc_handshake_manager *handshake_mgr; -} connector; - -static void connector_ref(grpc_connector *con) { - connector *c = (connector *)con; - gpr_ref(&c->refs); -} - -static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { - connector *c = (connector *)con; - if (gpr_unref(&c->refs)) { - /* c->initial_string_buffer does not need to be destroyed */ - grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); - gpr_free(c); - } -} - -static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - connector_unref(exec_ctx, arg); -} - -static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, - grpc_channel_args *args, - grpc_slice_buffer *read_buffer, void *user_data, - grpc_error *error) { - connector *c = user_data; - if (error != GRPC_ERROR_NONE) { - grpc_channel_args_destroy(args); - gpr_free(read_buffer); - } else { - c->result->transport = - grpc_create_chttp2_transport(exec_ctx, args, endpoint, 1); - GPR_ASSERT(c->result->transport); - grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, - read_buffer); - c->result->channel_args = args; - } - grpc_closure *notify = c->notify; - c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); -} - -static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - connector *c = arg; - grpc_endpoint *tcp = c->tcp; - if (tcp != NULL) { - if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) { - grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent, - c); - grpc_slice_buffer_init(&c->initial_string_buffer); - grpc_slice_buffer_add(&c->initial_string_buffer, - c->args.initial_connect_string); - connector_ref(arg); - grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer, - &c->initial_string_sent); - } else { - grpc_handshake_manager_do_handshake( - exec_ctx, c->handshake_mgr, tcp, c->args.channel_args, - c->args.deadline, NULL /* acceptor */, on_handshake_done, c); - } - } else { - memset(c->result, 0, sizeof(*c->result)); - grpc_closure *notify = c->notify; - c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); - } -} - -static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) {} - -static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con, - const grpc_connect_in_args *args, - grpc_connect_out_args *result, - grpc_closure *notify) { - connector *c = (connector *)con; - GPR_ASSERT(c->notify == NULL); - GPR_ASSERT(notify->cb); - c->notify = notify; - c->args = *args; - c->result = result; - c->tcp = NULL; - grpc_closure_init(&c->connected, connected, c); - grpc_tcp_client_connect(exec_ctx, &c->connected, &c->tcp, - args->interested_parties, args->channel_args, - args->addr, args->deadline); -} - -static const grpc_connector_vtable connector_vtable = { - connector_ref, connector_unref, connector_shutdown, connector_connect}; - -// -// client_channel_factory -// - static void client_channel_factory_ref( grpc_client_channel_factory *cc_factory) {} @@ -173,20 +53,11 @@ static void client_channel_factory_unref( static grpc_subchannel *client_channel_factory_create_subchannel( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, const grpc_subchannel_args *args) { - connector *c = gpr_malloc(sizeof(*c)); - memset(c, 0, sizeof(*c)); - c->base.vtable = &connector_vtable; - gpr_ref_init(&c->refs, 1); - c->handshake_mgr = grpc_handshake_manager_create(); - char *proxy_name = grpc_get_http_proxy_server(); - if (proxy_name != NULL) { - grpc_handshake_manager_add( - c->handshake_mgr, - grpc_http_connect_handshaker_create(proxy_name, args->server_name)); - gpr_free(proxy_name); - } - grpc_subchannel *s = grpc_subchannel_create(exec_ctx, &c->base, args); - grpc_connector_unref(exec_ctx, &c->base); + grpc_connector *connector = grpc_chttp2_connector_create( + exec_ctx, args->server_name, NULL /* create_handshakers */, + NULL /* user_data */); + grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args); + grpc_connector_unref(exec_ctx, connector); return s; } @@ -229,7 +100,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target, GRPC_API_TRACE( "grpc_insecure_channel_create(target=%p, args=%p, reserved=%p)", 3, (target, args, reserved)); - GPR_ASSERT(!reserved); + GPR_ASSERT(reserved == NULL); grpc_client_channel_factory *factory = (grpc_client_channel_factory *)&client_channel_factory; // Add channel args containing the server name and client channel factory. diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index be57f30bd0..63f2488088 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -33,195 +33,18 @@ #include <grpc/grpc.h> -#include <stdlib.h> #include <string.h> -#include <grpc/slice.h> -#include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> #include "src/core/ext/client_channel/client_channel.h" -#include "src/core/ext/client_channel/http_connect_handshaker.h" -#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/ext/transport/chttp2/client/chttp2_connector.h" #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/channel/handshaker.h" -#include "src/core/lib/iomgr/tcp_client.h" -#include "src/core/lib/security/context/security_context.h" #include "src/core/lib/security/credentials/credentials.h" -#include "src/core/lib/security/transport/auth_filters.h" +#include "src/core/lib/security/transport/security_connector.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/channel.h" -#include "src/core/lib/tsi/transport_security_interface.h" - -// -// connector -// - -typedef struct { - grpc_connector base; - gpr_refcount refs; - - grpc_channel_security_connector *security_connector; - - grpc_closure *notify; - grpc_connect_in_args args; - grpc_connect_out_args *result; - grpc_closure initial_string_sent; - grpc_slice_buffer initial_string_buffer; - - gpr_mu mu; - grpc_endpoint *connecting_endpoint; - grpc_endpoint *newly_connecting_endpoint; - - grpc_closure connected_closure; - - grpc_handshake_manager *handshake_mgr; - - // TODO(roth): Remove once we eliminate on_secure_handshake_done(). - grpc_channel_args *tmp_args; -} connector; - -static void connector_ref(grpc_connector *con) { - connector *c = (connector *)con; - gpr_ref(&c->refs); -} - -static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { - connector *c = (connector *)con; - if (gpr_unref(&c->refs)) { - /* c->initial_string_buffer does not need to be destroyed */ - grpc_channel_args_destroy(c->tmp_args); - grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); - gpr_free(c); - } -} - -static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, - grpc_security_status status, - grpc_endpoint *secure_endpoint, - grpc_auth_context *auth_context) { - connector *c = arg; - gpr_mu_lock(&c->mu); - grpc_error *error = GRPC_ERROR_NONE; - if (c->connecting_endpoint == NULL) { - memset(c->result, 0, sizeof(*c->result)); - gpr_mu_unlock(&c->mu); - } else if (status != GRPC_SECURITY_OK) { - error = grpc_error_set_int(GRPC_ERROR_CREATE("Secure handshake failed"), - GRPC_ERROR_INT_SECURITY_STATUS, status); - memset(c->result, 0, sizeof(*c->result)); - c->connecting_endpoint = NULL; - gpr_mu_unlock(&c->mu); - } else { - grpc_arg auth_context_arg; - c->connecting_endpoint = NULL; - gpr_mu_unlock(&c->mu); - c->result->transport = grpc_create_chttp2_transport( - exec_ctx, c->args.channel_args, secure_endpoint, 1); - grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL); - auth_context_arg = grpc_auth_context_to_arg(auth_context); - c->result->channel_args = - grpc_channel_args_copy_and_add(c->tmp_args, &auth_context_arg, 1); - } - grpc_closure *notify = c->notify; - c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); -} - -static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, - grpc_channel_args *args, - grpc_slice_buffer *read_buffer, void *user_data, - grpc_error *error) { - connector *c = user_data; - c->tmp_args = args; - if (error != GRPC_ERROR_NONE) { - gpr_free(read_buffer); - grpc_closure *notify = c->notify; - c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); - } else { - // TODO(roth, jboeuf): Convert security connector handshaking to use new - // handshake API, and then move the code from on_secure_handshake_done() - // into this function. - grpc_channel_security_connector_do_handshake( - exec_ctx, c->security_connector, endpoint, read_buffer, - c->args.deadline, on_secure_handshake_done, c); - } -} - -static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - connector *c = arg; - grpc_handshake_manager_do_handshake( - exec_ctx, c->handshake_mgr, c->connecting_endpoint, c->args.channel_args, - c->args.deadline, NULL /* acceptor */, on_handshake_done, c); -} - -static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - connector *c = arg; - grpc_endpoint *tcp = c->newly_connecting_endpoint; - if (tcp != NULL) { - gpr_mu_lock(&c->mu); - GPR_ASSERT(c->connecting_endpoint == NULL); - c->connecting_endpoint = tcp; - gpr_mu_unlock(&c->mu); - if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) { - grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent, - c); - grpc_slice_buffer_init(&c->initial_string_buffer); - grpc_slice_buffer_add(&c->initial_string_buffer, - c->args.initial_connect_string); - grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer, - &c->initial_string_sent); - } else { - grpc_handshake_manager_do_handshake( - exec_ctx, c->handshake_mgr, tcp, c->args.channel_args, - c->args.deadline, NULL /* acceptor */, on_handshake_done, c); - } - } else { - memset(c->result, 0, sizeof(*c->result)); - grpc_closure *notify = c->notify; - c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); - } -} - -static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) { - connector *c = (connector *)con; - grpc_endpoint *ep; - gpr_mu_lock(&c->mu); - ep = c->connecting_endpoint; - c->connecting_endpoint = NULL; - gpr_mu_unlock(&c->mu); - if (ep) { - grpc_endpoint_shutdown(exec_ctx, ep); - } -} - -static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con, - const grpc_connect_in_args *args, - grpc_connect_out_args *result, - grpc_closure *notify) { - connector *c = (connector *)con; - GPR_ASSERT(c->notify == NULL); - c->notify = notify; - c->args = *args; - c->result = result; - gpr_mu_lock(&c->mu); - GPR_ASSERT(c->connecting_endpoint == NULL); - gpr_mu_unlock(&c->mu); - grpc_closure_init(&c->connected_closure, connected, c); - grpc_tcp_client_connect( - exec_ctx, &c->connected_closure, &c->newly_connecting_endpoint, - args->interested_parties, args->channel_args, args->addr, args->deadline); -} - -static const grpc_connector_vtable connector_vtable = { - connector_ref, connector_unref, connector_shutdown, connector_connect}; - -// -// client_channel_factory -// typedef struct { grpc_client_channel_factory base; @@ -245,26 +68,21 @@ static void client_channel_factory_unref( } } +static void create_handshakers(grpc_exec_ctx *exec_ctx, + void *security_connector, + grpc_handshake_manager *handshake_mgr) { + grpc_channel_security_connector_create_handshakers( + exec_ctx, security_connector, handshake_mgr); +} + static grpc_subchannel *client_channel_factory_create_subchannel( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, const grpc_subchannel_args *args) { client_channel_factory *f = (client_channel_factory *)cc_factory; - connector *c = gpr_malloc(sizeof(*c)); - memset(c, 0, sizeof(*c)); - c->base.vtable = &connector_vtable; - c->security_connector = f->security_connector; - c->handshake_mgr = grpc_handshake_manager_create(); - char *proxy_name = grpc_get_http_proxy_server(); - if (proxy_name != NULL) { - grpc_handshake_manager_add( - c->handshake_mgr, - grpc_http_connect_handshaker_create(proxy_name, args->server_name)); - gpr_free(proxy_name); - } - gpr_mu_init(&c->mu); - gpr_ref_init(&c->refs, 1); - grpc_subchannel *s = grpc_subchannel_create(exec_ctx, &c->base, args); - grpc_connector_unref(exec_ctx, &c->base); + grpc_connector *connector = grpc_chttp2_connector_create( + exec_ctx, args->server_name, create_handshakers, f->security_connector); + grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args); + grpc_connector_unref(exec_ctx, connector); return s; } diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c new file mode 100644 index 0000000000..7795606e73 --- /dev/null +++ b/src/core/ext/transport/chttp2/server/chttp2_server.c @@ -0,0 +1,356 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/transport/chttp2/server/chttp2_server.h" + +#include <grpc/grpc.h> + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/sync.h> +#include <grpc/support/useful.h> + +#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/channel/http_server_filter.h" +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/tcp_server.h" +#include "src/core/lib/surface/api_trace.h" +#include "src/core/lib/surface/server.h" + +void grpc_chttp2_server_handshaker_factory_create_handshakers( + grpc_exec_ctx *exec_ctx, + grpc_chttp2_server_handshaker_factory *handshaker_factory, + grpc_handshake_manager *handshake_mgr) { + if (handshaker_factory != NULL) { + handshaker_factory->vtable->create_handshakers( + exec_ctx, handshaker_factory, handshake_mgr); + } +} + +void grpc_chttp2_server_handshaker_factory_destroy( + grpc_exec_ctx *exec_ctx, + grpc_chttp2_server_handshaker_factory *handshaker_factory) { + if (handshaker_factory != NULL) { + handshaker_factory->vtable->destroy(exec_ctx, handshaker_factory); + } +} + + +typedef struct pending_handshake_manager_node { + grpc_handshake_manager *handshake_mgr; + struct pending_handshake_manager_node *next; +} pending_handshake_manager_node; + +typedef struct { + grpc_server *server; + grpc_tcp_server *tcp_server; + grpc_channel_args *args; + grpc_chttp2_server_handshaker_factory *handshaker_factory; + gpr_mu mu; + bool shutdown; + grpc_closure tcp_server_shutdown_complete; + grpc_closure *server_destroy_listener_done; + pending_handshake_manager_node *pending_handshake_mgrs; +} server_state; + +typedef struct { + server_state *server_state; + grpc_pollset *accepting_pollset; + grpc_tcp_server_acceptor *acceptor; + grpc_handshake_manager *handshake_mgr; +} server_connection_state; + +static void pending_handshake_manager_add_locked( + server_state *state, grpc_handshake_manager *handshake_mgr) { + pending_handshake_manager_node *node = gpr_malloc(sizeof(*node)); + node->handshake_mgr = handshake_mgr; + node->next = state->pending_handshake_mgrs; + state->pending_handshake_mgrs = node; +} + +static void pending_handshake_manager_remove_locked( + server_state *state, grpc_handshake_manager *handshake_mgr) { + pending_handshake_manager_node **prev_node = &state->pending_handshake_mgrs; + for (pending_handshake_manager_node *node = state->pending_handshake_mgrs; + node != NULL; node = node->next) { + if (node->handshake_mgr == handshake_mgr) { + *prev_node = node->next; + gpr_free(node); + break; + } + prev_node = &node->next; + } +} + +static void pending_handshake_manager_shutdown_locked(grpc_exec_ctx *exec_ctx, + server_state *state) { + pending_handshake_manager_node *prev_node = NULL; + for (pending_handshake_manager_node *node = state->pending_handshake_mgrs; + node != NULL; node = node->next) { + grpc_handshake_manager_shutdown(exec_ctx, node->handshake_mgr); + gpr_free(prev_node); + prev_node = node; + } + gpr_free(prev_node); + state->pending_handshake_mgrs = NULL; +} + +static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_handshaker_args *args = arg; + server_connection_state *connection_state = args->user_data; + gpr_mu_lock(&connection_state->server_state->mu); + if (error != GRPC_ERROR_NONE || connection_state->server_state->shutdown) { + const char *error_str = grpc_error_string(error); + gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str); + grpc_error_free_string(error_str); + if (error == GRPC_ERROR_NONE) { + // We were shut down after handshaking completed successfully, so + // destroy the endpoint here. + // TODO(ctiller): It is currently necessary to shutdown endpoints + // before destroying them, even if we know that there are no + // pending read/write callbacks. This should be fixed, at which + // point this can be removed. + grpc_endpoint_shutdown(exec_ctx, args->endpoint); + grpc_endpoint_destroy(exec_ctx, args->endpoint); + grpc_channel_args_destroy(args->args); + grpc_slice_buffer_destroy(args->read_buffer); + gpr_free(args->read_buffer); + } + } else { + grpc_transport *transport = + grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 0); + grpc_server_setup_transport( + exec_ctx, connection_state->server_state->server, transport, + connection_state->accepting_pollset, args->args); + grpc_chttp2_transport_start_reading(exec_ctx, transport, args->read_buffer); + grpc_channel_args_destroy(args->args); + } + pending_handshake_manager_remove_locked(connection_state->server_state, + connection_state->handshake_mgr); + gpr_mu_unlock(&connection_state->server_state->mu); + grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); + grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server); + gpr_free(connection_state); +} + +static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, + grpc_pollset *accepting_pollset, + grpc_tcp_server_acceptor *acceptor) { + server_state *state = arg; + gpr_mu_lock(&state->mu); + if (state->shutdown) { + gpr_mu_unlock(&state->mu); + grpc_endpoint_destroy(exec_ctx, tcp); + return; + } + grpc_handshake_manager *handshake_mgr = grpc_handshake_manager_create(); + pending_handshake_manager_add_locked(state, handshake_mgr); + gpr_mu_unlock(&state->mu); + grpc_tcp_server_ref(state->tcp_server); + server_connection_state *connection_state = + gpr_malloc(sizeof(*connection_state)); + connection_state->server_state = state; + connection_state->accepting_pollset = accepting_pollset; + connection_state->acceptor = acceptor; + connection_state->handshake_mgr = handshake_mgr; + grpc_chttp2_server_handshaker_factory_create_handshakers( + exec_ctx, state->handshaker_factory, connection_state->handshake_mgr); + // TODO(roth): We should really get this timeout value from channel + // args instead of hard-coding it. + const gpr_timespec deadline = gpr_time_add( + gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); + grpc_handshake_manager_do_handshake( + exec_ctx, connection_state->handshake_mgr, tcp, state->args, deadline, + acceptor, on_handshake_done, connection_state); +} + +/* Server callback: start listening on our ports */ +static void server_start_listener(grpc_exec_ctx *exec_ctx, grpc_server *server, + void *arg, grpc_pollset **pollsets, + size_t pollset_count) { + server_state *state = arg; + gpr_mu_lock(&state->mu); + state->shutdown = false; + gpr_mu_unlock(&state->mu); + grpc_tcp_server_start(exec_ctx, state->tcp_server, pollsets, pollset_count, + on_accept, state); +} + +static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + server_state *state = arg; + /* ensure all threads have unlocked */ + gpr_mu_lock(&state->mu); + grpc_closure *destroy_done = state->server_destroy_listener_done; + GPR_ASSERT(state->shutdown); + pending_handshake_manager_shutdown_locked(exec_ctx, state); + gpr_mu_unlock(&state->mu); + // Flush queued work before destroying handshaker factory, since that + // may do a synchronous unref. + grpc_exec_ctx_flush(exec_ctx); + grpc_chttp2_server_handshaker_factory_destroy(exec_ctx, + state->handshaker_factory); + if (destroy_done != NULL) { + destroy_done->cb(exec_ctx, destroy_done->cb_arg, GRPC_ERROR_REF(error)); + grpc_exec_ctx_flush(exec_ctx); + } + grpc_channel_args_destroy(state->args); + gpr_mu_destroy(&state->mu); + gpr_free(state); +} + +/* Server callback: destroy the tcp listener (so we don't generate further + callbacks) */ +static void server_destroy_listener(grpc_exec_ctx *exec_ctx, + grpc_server *server, void *arg, + grpc_closure *destroy_done) { + server_state *state = arg; + gpr_mu_lock(&state->mu); + state->shutdown = true; + state->server_destroy_listener_done = destroy_done; + grpc_tcp_server *tcp_server = state->tcp_server; + gpr_mu_unlock(&state->mu); + grpc_tcp_server_shutdown_listeners(exec_ctx, tcp_server); + grpc_tcp_server_unref(exec_ctx, tcp_server); +} + +grpc_error *grpc_chttp2_server_add_port( + grpc_exec_ctx *exec_ctx, grpc_server *server, const char *addr, + grpc_channel_args *args, + grpc_chttp2_server_handshaker_factory *handshaker_factory, int *port_num) { + grpc_resolved_addresses *resolved = NULL; + grpc_tcp_server *tcp_server = NULL; + size_t i; + size_t count = 0; + int port_temp; + grpc_error *err = GRPC_ERROR_NONE; + server_state *state = NULL; + grpc_error **errors = NULL; + + *port_num = -1; + + /* resolve address */ + err = grpc_blocking_resolve_address(addr, "https", &resolved); + if (err != GRPC_ERROR_NONE) { + goto error; + } + state = gpr_malloc(sizeof(*state)); + memset(state, 0, sizeof(*state)); + grpc_closure_init(&state->tcp_server_shutdown_complete, + tcp_server_shutdown_complete, state); + err = + grpc_tcp_server_create(exec_ctx, &state->tcp_server_shutdown_complete, + args, &tcp_server); + if (err != GRPC_ERROR_NONE) { + goto error; + } + + state->server = server; + state->tcp_server = tcp_server; + state->args = args; + state->handshaker_factory = handshaker_factory; + state->shutdown = true; + gpr_mu_init(&state->mu); + + const size_t naddrs = resolved->naddrs; + errors = gpr_malloc(sizeof(*errors) * naddrs); + for (i = 0; i < naddrs; i++) { + errors[i] = + grpc_tcp_server_add_port(tcp_server, &resolved->addrs[i], &port_temp); + if (errors[i] == GRPC_ERROR_NONE) { + if (*port_num == -1) { + *port_num = port_temp; + } else { + GPR_ASSERT(*port_num == port_temp); + } + count++; + } + } + if (count == 0) { + char *msg; + gpr_asprintf(&msg, "No address added out of total %" PRIuPTR " resolved", + naddrs); + err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs); + gpr_free(msg); + goto error; + } else if (count != naddrs) { + char *msg; + gpr_asprintf(&msg, "Only %" PRIuPTR + " addresses added out of total %" PRIuPTR " resolved", + count, naddrs); + err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs); + gpr_free(msg); + + const char *warning_message = grpc_error_string(err); + gpr_log(GPR_INFO, "WARNING: %s", warning_message); + grpc_error_free_string(warning_message); + /* we managed to bind some addresses: continue */ + } + grpc_resolved_addresses_destroy(resolved); + + /* Register with the server only upon success */ + grpc_server_add_listener(exec_ctx, server, state, server_start_listener, + server_destroy_listener); + goto done; + +/* Error path: cleanup and return */ +error: + GPR_ASSERT(err != GRPC_ERROR_NONE); + if (resolved) { + grpc_resolved_addresses_destroy(resolved); + } + if (tcp_server) { + grpc_tcp_server_unref(exec_ctx, tcp_server); + } else { + grpc_channel_args_destroy(args); + grpc_chttp2_server_handshaker_factory_destroy(exec_ctx, handshaker_factory); + gpr_free(state); + } + *port_num = 0; + +done: + if (errors != NULL) { + for (i = 0; i < naddrs; i++) { + GRPC_ERROR_UNREF(errors[i]); + } + gpr_free(errors); + } + return err; +} diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.h b/src/core/ext/transport/chttp2/server/chttp2_server.h new file mode 100644 index 0000000000..b1ff04bcbb --- /dev/null +++ b/src/core/ext/transport/chttp2/server/chttp2_server.h @@ -0,0 +1,79 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H + +#include <grpc/impl/codegen/grpc_types.h> + +#include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/iomgr/exec_ctx.h" + +/// A server handshaker factory is used to create handshakers for server +/// connections. +typedef struct grpc_chttp2_server_handshaker_factory + grpc_chttp2_server_handshaker_factory; + +typedef struct { + void (*create_handshakers)( + grpc_exec_ctx *exec_ctx, + grpc_chttp2_server_handshaker_factory *handshaker_factory, + grpc_handshake_manager *handshake_mgr); + void (*destroy)(grpc_exec_ctx *exec_ctx, + grpc_chttp2_server_handshaker_factory *handshaker_factory); +} grpc_chttp2_server_handshaker_factory_vtable; + +struct grpc_chttp2_server_handshaker_factory { + const grpc_chttp2_server_handshaker_factory_vtable *vtable; +}; + +void grpc_chttp2_server_handshaker_factory_create_handshakers( + grpc_exec_ctx *exec_ctx, + grpc_chttp2_server_handshaker_factory *handshaker_factory, + grpc_handshake_manager *handshake_mgr); + +void grpc_chttp2_server_handshaker_factory_destroy( + grpc_exec_ctx *exec_ctx, + grpc_chttp2_server_handshaker_factory *handshaker_factory); + +/// Adds a port to \a server. Sets \a port_num to the port number. +/// If \a handshaker_factory is not NULL, it will be used to create +/// handshakers for the port. +/// Takes ownership of \a args and \a handshaker_factory. +grpc_error *grpc_chttp2_server_add_port( + grpc_exec_ctx *exec_ctx, grpc_server *server, const char *addr, + grpc_channel_args *args, + grpc_chttp2_server_handshaker_factory *handshaker_factory, + int *port_num); + +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H */ diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index c18d618f96..366312bd72 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -33,180 +33,28 @@ #include <grpc/grpc.h> -#include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <grpc/support/useful.h> -#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/ext/transport/chttp2/server/chttp2_server.h" #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/channel/handshaker.h" -#include "src/core/lib/channel/http_server_filter.h" -#include "src/core/lib/iomgr/resolve_address.h" -#include "src/core/lib/iomgr/tcp_server.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -typedef struct server_connect_state { - grpc_server *server; - grpc_pollset *accepting_pollset; - grpc_tcp_server_acceptor *acceptor; - grpc_handshake_manager *handshake_mgr; -} server_connect_state; - -static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, - grpc_channel_args *args, - grpc_slice_buffer *read_buffer, void *user_data, - grpc_error *error) { - server_connect_state *state = user_data; - if (error != GRPC_ERROR_NONE) { - const char *error_str = grpc_error_string(error); - gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str); - grpc_error_free_string(error_str); - GRPC_ERROR_UNREF(error); - grpc_handshake_manager_shutdown(exec_ctx, state->handshake_mgr); - gpr_free(read_buffer); - } else { - // Beware that the call to grpc_create_chttp2_transport() has to happen - // before grpc_tcp_server_destroy(). This is fine here, but similar code - // asynchronously doing a handshake instead of calling - // grpc_tcp_server_start() (as in server_secure_chttp2.c) needs to add - // synchronization to avoid this case. - grpc_transport *transport = - grpc_create_chttp2_transport(exec_ctx, args, endpoint, 0); - grpc_server_setup_transport(exec_ctx, state->server, transport, - state->accepting_pollset, - grpc_server_get_channel_args(state->server)); - grpc_chttp2_transport_start_reading(exec_ctx, transport, read_buffer); - } - // Clean up. - grpc_channel_args_destroy(args); - grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr); - gpr_free(state); -} - -static void on_accept(grpc_exec_ctx *exec_ctx, void *server, grpc_endpoint *tcp, - grpc_pollset *accepting_pollset, - grpc_tcp_server_acceptor *acceptor) { - server_connect_state *state = gpr_malloc(sizeof(server_connect_state)); - state->server = server; - state->accepting_pollset = accepting_pollset; - state->acceptor = acceptor; - state->handshake_mgr = grpc_handshake_manager_create(); - // TODO(roth): We should really get this timeout value from channel - // args instead of hard-coding it. - const gpr_timespec deadline = gpr_time_add( - gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); - grpc_handshake_manager_do_handshake( - exec_ctx, state->handshake_mgr, tcp, grpc_server_get_channel_args(server), - deadline, acceptor, on_handshake_done, state); -} - -/* Server callback: start listening on our ports */ -static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, - grpc_pollset **pollsets, size_t pollset_count) { - grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_start(exec_ctx, tcp, pollsets, pollset_count, on_accept, - server); -} - -/* Server callback: destroy the tcp listener (so we don't generate further - callbacks) */ -static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, - grpc_closure *destroy_done) { - grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_shutdown_listeners(exec_ctx, tcp); - grpc_tcp_server_unref(exec_ctx, tcp); - grpc_exec_ctx_sched(exec_ctx, destroy_done, GRPC_ERROR_NONE, NULL); -} - int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { - grpc_resolved_addresses *resolved = NULL; - grpc_tcp_server *tcp = NULL; - size_t i; - size_t count = 0; - int port_num = -1; - int port_temp; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_error *err = GRPC_ERROR_NONE; - + int port_num = 0; GRPC_API_TRACE("grpc_server_add_insecure_http2_port(server=%p, addr=%s)", 2, (server, addr)); - - grpc_error **errors = NULL; - err = grpc_blocking_resolve_address(addr, "https", &resolved); - if (err != GRPC_ERROR_NONE) { - goto error; - } - - err = grpc_tcp_server_create(&exec_ctx, NULL, - grpc_server_get_channel_args(server), &tcp); + grpc_error* err = grpc_chttp2_server_add_port( + &exec_ctx, server, addr, + grpc_channel_args_copy(grpc_server_get_channel_args(server)), + NULL /* handshaker_factory */, &port_num); if (err != GRPC_ERROR_NONE) { - goto error; - } - - const size_t naddrs = resolved->naddrs; - errors = gpr_malloc(sizeof(*errors) * naddrs); - for (i = 0; i < naddrs; i++) { - errors[i] = grpc_tcp_server_add_port(tcp, &resolved->addrs[i], &port_temp); - if (errors[i] == GRPC_ERROR_NONE) { - if (port_num == -1) { - port_num = port_temp; - } else { - GPR_ASSERT(port_num == port_temp); - } - count++; - } + const char *msg = grpc_error_string(err); + gpr_log(GPR_ERROR, "%s", msg); + grpc_error_free_string(msg); + GRPC_ERROR_UNREF(err); } - if (count == 0) { - char *msg; - gpr_asprintf(&msg, "No address added out of total %" PRIuPTR " resolved", - naddrs); - err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs); - gpr_free(msg); - goto error; - } else if (count != naddrs) { - char *msg; - gpr_asprintf(&msg, "Only %" PRIuPTR - " addresses added out of total %" PRIuPTR " resolved", - count, naddrs); - err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs); - gpr_free(msg); - - const char *warning_message = grpc_error_string(err); - gpr_log(GPR_INFO, "WARNING: %s", warning_message); - grpc_error_free_string(warning_message); - /* we managed to bind some addresses: continue */ - } - grpc_resolved_addresses_destroy(resolved); - - /* Register with the server only upon success */ - grpc_server_add_listener(&exec_ctx, server, tcp, start, destroy); - goto done; - -/* Error path: cleanup and return */ -error: - GPR_ASSERT(err != GRPC_ERROR_NONE); - if (resolved) { - grpc_resolved_addresses_destroy(resolved); - } - if (tcp) { - grpc_tcp_server_unref(&exec_ctx, tcp); - } - port_num = 0; - - const char *msg = grpc_error_string(err); - gpr_log(GPR_ERROR, "%s", msg); - grpc_error_free_string(msg); - GRPC_ERROR_UNREF(err); - -done: grpc_exec_ctx_finish(&exec_ctx); - if (errors != NULL) { - for (i = 0; i < naddrs; i++) { - GRPC_ERROR_UNREF(errors[i]); - } - } - gpr_free(errors); return port_num; } diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index 942638ad7f..5f41728132 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -38,218 +38,63 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> -#include <grpc/support/sync.h> -#include <grpc/support/useful.h> + +#include "src/core/ext/transport/chttp2/server/chttp2_server.h" + #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" -#include "src/core/lib/channel/http_server_filter.h" -#include "src/core/lib/iomgr/endpoint.h" -#include "src/core/lib/iomgr/resolve_address.h" -#include "src/core/lib/iomgr/tcp_server.h" #include "src/core/lib/security/context/security_context.h" #include "src/core/lib/security/credentials/credentials.h" -#include "src/core/lib/security/transport/auth_filters.h" -#include "src/core/lib/security/transport/security_connector.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -typedef struct server_secure_state { - grpc_server *server; - grpc_tcp_server *tcp; - grpc_server_security_connector *sc; - grpc_server_credentials *creds; - bool is_shutdown; - gpr_mu mu; - grpc_closure tcp_server_shutdown_complete; - grpc_closure *server_destroy_listener_done; -} server_secure_state; - -typedef struct server_secure_connect { - server_secure_state *server_state; - grpc_pollset *accepting_pollset; - grpc_tcp_server_acceptor *acceptor; - grpc_handshake_manager *handshake_mgr; - // TODO(roth): Remove the following two fields when we eliminate - // grpc_server_security_connector_do_handshake(). - gpr_timespec deadline; - grpc_channel_args *args; -} server_secure_connect; - -static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep, - grpc_security_status status, - grpc_endpoint *secure_endpoint, - grpc_auth_context *auth_context) { - server_secure_connect *connection_state = statep; - if (status == GRPC_SECURITY_OK) { - if (secure_endpoint) { - gpr_mu_lock(&connection_state->server_state->mu); - if (!connection_state->server_state->is_shutdown) { - grpc_transport *transport = grpc_create_chttp2_transport( - exec_ctx, grpc_server_get_channel_args( - connection_state->server_state->server), - secure_endpoint, 0); - grpc_arg args_to_add[2]; - args_to_add[0] = grpc_server_credentials_to_arg( - connection_state->server_state->creds); - args_to_add[1] = grpc_auth_context_to_arg(auth_context); - grpc_channel_args *args_copy = grpc_channel_args_copy_and_add( - connection_state->args, args_to_add, GPR_ARRAY_SIZE(args_to_add)); - grpc_server_setup_transport( - exec_ctx, connection_state->server_state->server, transport, - connection_state->accepting_pollset, args_copy); - grpc_channel_args_destroy(args_copy); - grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL); - } else { - /* We need to consume this here, because the server may already have - * gone away. */ - grpc_endpoint_destroy(exec_ctx, secure_endpoint); - } - gpr_mu_unlock(&connection_state->server_state->mu); - } - } else { - gpr_log(GPR_ERROR, "Secure transport failed with error %d", status); - } - grpc_channel_args_destroy(connection_state->args); - grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp); - gpr_free(connection_state); -} - -static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, - grpc_channel_args *args, - grpc_slice_buffer *read_buffer, void *user_data, - grpc_error *error) { - server_secure_connect *connection_state = user_data; - if (error != GRPC_ERROR_NONE) { - const char *error_str = grpc_error_string(error); - gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str); - grpc_error_free_string(error_str); - GRPC_ERROR_UNREF(error); - grpc_channel_args_destroy(args); - gpr_free(read_buffer); - grpc_handshake_manager_shutdown(exec_ctx, connection_state->handshake_mgr); - grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); - grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp); - gpr_free(connection_state); - return; - } - grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); - connection_state->handshake_mgr = NULL; - // TODO(roth, jboeuf): Convert security connector handshaking to use new - // handshake API, and then move the code from on_secure_handshake_done() - // into this function. - connection_state->args = args; - grpc_server_security_connector_do_handshake( - exec_ctx, connection_state->server_state->sc, connection_state->acceptor, - endpoint, read_buffer, connection_state->deadline, - on_secure_handshake_done, connection_state); -} - -static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp, - grpc_pollset *accepting_pollset, - grpc_tcp_server_acceptor *acceptor) { - server_secure_state *server_state = statep; - server_secure_connect *connection_state = NULL; - gpr_mu_lock(&server_state->mu); - if (server_state->is_shutdown) { - gpr_mu_unlock(&server_state->mu); - grpc_endpoint_destroy(exec_ctx, tcp); - return; - } - gpr_mu_unlock(&server_state->mu); - grpc_tcp_server_ref(server_state->tcp); - connection_state = gpr_malloc(sizeof(*connection_state)); - connection_state->server_state = server_state; - connection_state->accepting_pollset = accepting_pollset; - connection_state->acceptor = acceptor; - connection_state->handshake_mgr = grpc_handshake_manager_create(); - // TODO(roth): We should really get this timeout value from channel - // args instead of hard-coding it. - connection_state->deadline = gpr_time_add( - gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); - grpc_handshake_manager_do_handshake( - exec_ctx, connection_state->handshake_mgr, tcp, - grpc_server_get_channel_args(connection_state->server_state->server), - connection_state->deadline, acceptor, on_handshake_done, - connection_state); +typedef struct { + grpc_chttp2_server_handshaker_factory base; + grpc_server_security_connector *security_connector; +} server_security_handshaker_factory; + +static void server_security_handshaker_factory_create_handshakers( + grpc_exec_ctx *exec_ctx, grpc_chttp2_server_handshaker_factory *hf, + grpc_handshake_manager *handshake_mgr) { + server_security_handshaker_factory *handshaker_factory = + (server_security_handshaker_factory *)hf; + grpc_server_security_connector_create_handshakers( + exec_ctx, handshaker_factory->security_connector, handshake_mgr); } -/* Server callback: start listening on our ports */ -static void server_start_listener(grpc_exec_ctx *exec_ctx, grpc_server *server, - void *statep, grpc_pollset **pollsets, - size_t pollset_count) { - server_secure_state *server_state = statep; - gpr_mu_lock(&server_state->mu); - server_state->is_shutdown = false; - gpr_mu_unlock(&server_state->mu); - grpc_tcp_server_start(exec_ctx, server_state->tcp, pollsets, pollset_count, - on_accept, server_state); +static void server_security_handshaker_factory_destroy( + grpc_exec_ctx* exec_ctx, grpc_chttp2_server_handshaker_factory *hf) { + server_security_handshaker_factory *handshaker_factory = + (server_security_handshaker_factory *)hf; + GRPC_SECURITY_CONNECTOR_UNREF(&handshaker_factory->security_connector->base, + "server"); + gpr_free(hf); } -static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *statep, - grpc_error *error) { - server_secure_state *server_state = statep; - /* ensure all threads have unlocked */ - gpr_mu_lock(&server_state->mu); - grpc_closure *destroy_done = server_state->server_destroy_listener_done; - GPR_ASSERT(server_state->is_shutdown); - gpr_mu_unlock(&server_state->mu); - /* clean up */ - grpc_server_security_connector_shutdown(exec_ctx, server_state->sc); - - /* Flush queued work before a synchronous unref. */ - grpc_exec_ctx_flush(exec_ctx); - GRPC_SECURITY_CONNECTOR_UNREF(&server_state->sc->base, "server"); - grpc_server_credentials_unref(server_state->creds); - - if (destroy_done != NULL) { - destroy_done->cb(exec_ctx, destroy_done->cb_arg, GRPC_ERROR_REF(error)); - grpc_exec_ctx_flush(exec_ctx); - } - gpr_free(server_state); -} - -static void server_destroy_listener(grpc_exec_ctx *exec_ctx, - grpc_server *server, void *statep, - grpc_closure *callback) { - server_secure_state *server_state = statep; - grpc_tcp_server *tcp; - gpr_mu_lock(&server_state->mu); - server_state->is_shutdown = true; - server_state->server_destroy_listener_done = callback; - tcp = server_state->tcp; - gpr_mu_unlock(&server_state->mu); - grpc_tcp_server_shutdown_listeners(exec_ctx, tcp); - grpc_tcp_server_unref(exec_ctx, server_state->tcp); -} +static const grpc_chttp2_server_handshaker_factory_vtable + server_security_handshaker_factory_vtable = { + server_security_handshaker_factory_create_handshakers, + server_security_handshaker_factory_destroy}; int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, grpc_server_credentials *creds) { - grpc_resolved_addresses *resolved = NULL; - grpc_tcp_server *tcp = NULL; - server_secure_state *server_state = NULL; - size_t i; - size_t count = 0; - int port_num = -1; - int port_temp; - grpc_security_status status = GRPC_SECURITY_ERROR; - grpc_server_security_connector *sc = NULL; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_error *err = GRPC_ERROR_NONE; - grpc_error **errors = NULL; - + grpc_server_security_connector *sc = NULL; + int port_num = 0; GRPC_API_TRACE( "grpc_server_add_secure_http2_port(" "server=%p, addr=%s, creds=%p)", 3, (server, addr, creds)); - - /* create security context */ + // Create security context. if (creds == NULL) { err = GRPC_ERROR_CREATE( "No credentials specified for secure server port (creds==NULL)"); - goto error; + goto done; } - status = grpc_server_credentials_create_security_connector(creds, &sc); + grpc_security_status status = + grpc_server_credentials_create_security_connector(creds, &sc); if (status != GRPC_SECURITY_OK) { char *msg; gpr_asprintf(&msg, @@ -258,107 +103,28 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, err = grpc_error_set_int(GRPC_ERROR_CREATE(msg), GRPC_ERROR_INT_SECURITY_STATUS, status); gpr_free(msg); - goto error; + goto done; } - sc->channel_args = grpc_server_get_channel_args(server); - - /* resolve address */ - err = grpc_blocking_resolve_address(addr, "https", &resolved); - if (err != GRPC_ERROR_NONE) { - goto error; - } - server_state = gpr_malloc(sizeof(*server_state)); - memset(server_state, 0, sizeof(*server_state)); - grpc_closure_init(&server_state->tcp_server_shutdown_complete, - tcp_server_shutdown_complete, server_state); - err = grpc_tcp_server_create(&exec_ctx, - &server_state->tcp_server_shutdown_complete, - grpc_server_get_channel_args(server), &tcp); + // Create handshaker factory. + server_security_handshaker_factory* handshaker_factory = + gpr_malloc(sizeof(*handshaker_factory)); + memset(handshaker_factory, 0, sizeof(*handshaker_factory)); + handshaker_factory->base.vtable = &server_security_handshaker_factory_vtable; + handshaker_factory->security_connector = sc; + // Create channel args. + grpc_arg channel_arg = grpc_server_credentials_to_arg(creds); + grpc_channel_args *args = grpc_channel_args_copy_and_add( + grpc_server_get_channel_args(server), &channel_arg, 1); + // Add server port. + err = grpc_chttp2_server_add_port(&exec_ctx, server, addr, args, + &handshaker_factory->base, &port_num); +done: + grpc_exec_ctx_finish(&exec_ctx); if (err != GRPC_ERROR_NONE) { - goto error; + const char *msg = grpc_error_string(err); + gpr_log(GPR_ERROR, "%s", msg); + grpc_error_free_string(msg); + GRPC_ERROR_UNREF(err); } - - server_state->server = server; - server_state->tcp = tcp; - server_state->sc = sc; - server_state->creds = grpc_server_credentials_ref(creds); - server_state->is_shutdown = true; - gpr_mu_init(&server_state->mu); - - errors = gpr_malloc(sizeof(*errors) * resolved->naddrs); - for (i = 0; i < resolved->naddrs; i++) { - errors[i] = grpc_tcp_server_add_port(tcp, &resolved->addrs[i], &port_temp); - if (errors[i] == GRPC_ERROR_NONE) { - if (port_num == -1) { - port_num = port_temp; - } else { - GPR_ASSERT(port_num == port_temp); - } - count++; - } - } - if (count == 0) { - char *msg; - gpr_asprintf(&msg, "No address added out of total %" PRIuPTR " resolved", - resolved->naddrs); - err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, resolved->naddrs); - gpr_free(msg); - goto error; - } else if (count != resolved->naddrs) { - char *msg; - gpr_asprintf(&msg, "Only %" PRIuPTR - " addresses added out of total %" PRIuPTR " resolved", - count, resolved->naddrs); - err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, resolved->naddrs); - gpr_free(msg); - - const char *warning_message = grpc_error_string(err); - gpr_log(GPR_INFO, "WARNING: %s", warning_message); - grpc_error_free_string(warning_message); - /* we managed to bind some addresses: continue */ - } else { - for (i = 0; i < resolved->naddrs; i++) { - GRPC_ERROR_UNREF(errors[i]); - } - } - gpr_free(errors); - errors = NULL; - grpc_resolved_addresses_destroy(resolved); - - /* Register with the server only upon success */ - grpc_server_add_listener(&exec_ctx, server, server_state, - server_start_listener, server_destroy_listener); - - grpc_exec_ctx_finish(&exec_ctx); return port_num; - -/* Error path: cleanup and return */ -error: - GPR_ASSERT(err != GRPC_ERROR_NONE); - if (errors != NULL) { - for (i = 0; i < resolved->naddrs; i++) { - GRPC_ERROR_UNREF(errors[i]); - } - gpr_free(errors); - } - if (resolved) { - grpc_resolved_addresses_destroy(resolved); - } - if (tcp) { - grpc_tcp_server_unref(&exec_ctx, tcp); - } else { - if (sc) { - grpc_exec_ctx_flush(&exec_ctx); - GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "server"); - } - if (server_state) { - gpr_free(server_state); - } - } - grpc_exec_ctx_finish(&exec_ctx); - const char *msg = grpc_error_string(err); - GRPC_ERROR_UNREF(err); - gpr_log(GPR_ERROR, "%s", msg); - grpc_error_free_string(msg); - return 0; } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 4e3c7ff681..3e7c078d3c 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -111,9 +111,6 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx, static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, void *byte_stream, grpc_error *error_ignored); -static void fail_pending_writes(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, grpc_chttp2_stream *s, - grpc_error *error); static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); @@ -428,6 +425,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, /* flush writable stream list to avoid dangling references */ grpc_chttp2_stream *s; while (grpc_chttp2_list_pop_writable_stream(t, &s)) { + grpc_chttp2_leave_writing_lists(exec_ctx, t, s); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close"); } end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error)); @@ -523,6 +521,10 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, } } + if (s->fail_pending_writes_on_writes_finished_error != NULL) { + GRPC_ERROR_UNREF(s->fail_pending_writes_on_writes_finished_error); + } + GPR_ASSERT(s->send_initial_metadata_finished == NULL); GPR_ASSERT(s->fetching_send_message == NULL); GPR_ASSERT(s->send_trailing_metadata_finished == NULL); @@ -704,8 +706,6 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, } } - grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error)); - switch (t->write_state) { case GRPC_CHTTP2_WRITE_STATE_IDLE: GPR_UNREACHABLE_CODE(break); @@ -734,6 +734,8 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, break; } + grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error)); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing"); GPR_TIMER_END("terminate_writing_with_lock", 0); } @@ -1404,6 +1406,7 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } if (grpc_chttp2_list_remove_writable_stream(t, s)) { + grpc_chttp2_leave_writing_lists(exec_ctx, t, s); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:remove_stream"); } @@ -1534,9 +1537,41 @@ static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s, return error; } -static void fail_pending_writes(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, grpc_chttp2_stream *s, - grpc_error *error) { +void grpc_chttp2_leave_writing_lists(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s) { + if (s->need_fail_pending_writes_on_writes_finished) { + grpc_error *error = s->fail_pending_writes_on_writes_finished_error; + s->fail_pending_writes_on_writes_finished_error = NULL; + s->need_fail_pending_writes_on_writes_finished = false; + grpc_chttp2_fail_pending_writes(exec_ctx, t, s, error); + } +} + +void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, grpc_error *error) { + if (s->need_fail_pending_writes_on_writes_finished || + (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE && + (s->included[GRPC_CHTTP2_LIST_WRITABLE] || + s->included[GRPC_CHTTP2_LIST_WRITING]))) { + /* If a write is in progress, and it involves this stream, wait for the + * write to complete before cancelling things out. If we don't do this, then + * our combiner lock might think that some operation on its queue might be + * covering a completion even though there is none, in which case we might + * offload to another thread, which isn't guarateed to exist */ + if (error != GRPC_ERROR_NONE) { + if (s->fail_pending_writes_on_writes_finished_error == GRPC_ERROR_NONE) { + s->fail_pending_writes_on_writes_finished_error = GRPC_ERROR_CREATE( + "Post-poned fail writes due to in-progress write"); + } + s->fail_pending_writes_on_writes_finished_error = grpc_error_add_child( + s->fail_pending_writes_on_writes_finished_error, error); + } + s->need_fail_pending_writes_on_writes_finished = true; + return; /* early out */ + } + error = removal_error(error, s, "Pending writes failed due to stream closure"); s->send_initial_metadata = NULL; @@ -1590,7 +1625,7 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, if (close_writes && !s->write_closed) { s->write_closed_error = GRPC_ERROR_REF(error); s->write_closed = true; - fail_pending_writes(exec_ctx, t, s, GRPC_ERROR_REF(error)); + grpc_chttp2_fail_pending_writes(exec_ctx, t, s, GRPC_ERROR_REF(error)); grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); } if (s->read_closed && s->write_closed) { diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index b74233d992..6cba1e7fd2 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -409,6 +409,9 @@ struct grpc_chttp2_stream { grpc_error *read_closed_error; /** the error that resulted in this stream being write-closed */ grpc_error *write_closed_error; + /** should any writes be cleared once this stream becomes non-writable */ + bool need_fail_pending_writes_on_writes_finished; + grpc_error *fail_pending_writes_on_writes_finished_error; grpc_published_metadata_method published_metadata[2]; bool final_metadata_requested; @@ -689,4 +692,11 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s); +void grpc_chttp2_leave_writing_lists(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s); +void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, grpc_error *error); + #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */ diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 139e7387c4..769b229a0d 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -208,6 +208,7 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:already_writing"); } } else { + grpc_chttp2_leave_writing_lists(exec_ctx, t, s); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:no_write"); } } @@ -252,6 +253,7 @@ void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_mark_stream_closed(exec_ctx, t, s, !t->is_client, 1, GRPC_ERROR_REF(error)); } + grpc_chttp2_leave_writing_lists(exec_ctx, t, s); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:end"); } grpc_slice_buffer_reset_and_unref(&t->outbuf); diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c index a45a39981c..90626dc2d1 100644 --- a/src/core/lib/channel/handshaker.c +++ b/src/core/lib/channel/handshaker.c @@ -38,67 +38,66 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/iomgr/timer.h" // // grpc_handshaker // -void grpc_handshaker_init(const struct grpc_handshaker_vtable* vtable, +void grpc_handshaker_init(const grpc_handshaker_vtable* vtable, grpc_handshaker* handshaker) { handshaker->vtable = vtable; } -void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx, - grpc_handshaker* handshaker) { +static void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker) { handshaker->vtable->destroy(exec_ctx, handshaker); } -void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx, - grpc_handshaker* handshaker) { +static void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker) { handshaker->vtable->shutdown(exec_ctx, handshaker); } -void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, - grpc_handshaker* handshaker, - grpc_endpoint* endpoint, - grpc_channel_args* args, - grpc_slice_buffer* read_buffer, - gpr_timespec deadline, - grpc_tcp_server_acceptor* acceptor, - grpc_handshaker_done_cb cb, void* user_data) { - handshaker->vtable->do_handshake(exec_ctx, handshaker, endpoint, args, - read_buffer, deadline, acceptor, cb, - user_data); +static void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker, + grpc_tcp_server_acceptor* acceptor, + grpc_closure* on_handshake_done, + grpc_handshaker_args* args) { + handshaker->vtable->do_handshake(exec_ctx, handshaker, acceptor, + on_handshake_done, args); } // // grpc_handshake_manager // -// State used while chaining handshakers. -struct grpc_handshaker_state { - // The index of the handshaker to invoke next. - size_t index; - // The deadline for all handshakers. - gpr_timespec deadline; - // The acceptor to call the handshakers with. - grpc_tcp_server_acceptor* acceptor; - // The final callback and user_data to invoke after the last handshaker. - grpc_handshaker_done_cb final_cb; - void* final_user_data; -}; - struct grpc_handshake_manager { + gpr_mu mu; + gpr_refcount refs; + bool shutdown; // An array of handshakers added via grpc_handshake_manager_add(). size_t count; grpc_handshaker** handshakers; - // State used while chaining handshakers. - struct grpc_handshaker_state* state; + // The index of the handshaker to invoke next and closure to invoke it. + size_t index; + grpc_closure call_next_handshaker; + // The acceptor to call the handshakers with. + grpc_tcp_server_acceptor* acceptor; + // Deadline timer across all handshakers. + grpc_timer deadline_timer; + // The final callback and user_data to invoke after the last handshaker. + grpc_closure on_handshake_done; + void* user_data; + // Handshaker args. + grpc_handshaker_args args; }; grpc_handshake_manager* grpc_handshake_manager_create() { grpc_handshake_manager* mgr = gpr_malloc(sizeof(grpc_handshake_manager)); memset(mgr, 0, sizeof(*mgr)); + gpr_mu_init(&mgr->mu); + gpr_ref_init(&mgr->refs, 1); return mgr; } @@ -106,6 +105,7 @@ static bool is_power_of_2(size_t n) { return (n & (n - 1)) == 0; } void grpc_handshake_manager_add(grpc_handshake_manager* mgr, grpc_handshaker* handshaker) { + gpr_mu_lock(&mgr->mu); // To avoid allocating memory for each handshaker we add, we double // the number of elements every time we need more. size_t realloc_count = 0; @@ -119,85 +119,116 @@ void grpc_handshake_manager_add(grpc_handshake_manager* mgr, gpr_realloc(mgr->handshakers, realloc_count * sizeof(grpc_handshaker*)); } mgr->handshakers[mgr->count++] = handshaker; + gpr_mu_unlock(&mgr->mu); +} + +static void grpc_handshake_manager_unref(grpc_exec_ctx* exec_ctx, + grpc_handshake_manager* mgr) { + if (gpr_unref(&mgr->refs)) { + for (size_t i = 0; i < mgr->count; ++i) { + grpc_handshaker_destroy(exec_ctx, mgr->handshakers[i]); + } + gpr_free(mgr->handshakers); + gpr_mu_destroy(&mgr->mu); + gpr_free(mgr); + } } void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr) { - for (size_t i = 0; i < mgr->count; ++i) { - grpc_handshaker_destroy(exec_ctx, mgr->handshakers[i]); - } - gpr_free(mgr->handshakers); - gpr_free(mgr); + grpc_handshake_manager_unref(exec_ctx, mgr); } void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr) { - for (size_t i = 0; i < mgr->count; ++i) { - grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[i]); + gpr_mu_lock(&mgr->mu); + // Shutdown the handshaker that's currently in progress, if any. + if (!mgr->shutdown && mgr->index > 0) { + mgr->shutdown = true; + grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[mgr->index - 1]); } - if (mgr->state != NULL) { - gpr_free(mgr->state); - mgr->state = NULL; + gpr_mu_unlock(&mgr->mu); +} + +// Helper function to call either the next handshaker or the +// on_handshake_done callback. +// Returns true if we've scheduled the on_handshake_done callback. +static bool call_next_handshaker_locked(grpc_exec_ctx* exec_ctx, + grpc_handshake_manager* mgr, + grpc_error* error) { + GPR_ASSERT(mgr->index <= mgr->count); + // If we got an error or we've been shut down or we've finished the last + // handshaker, invoke the on_handshake_done callback. Otherwise, call the + // next handshaker. + if (error != GRPC_ERROR_NONE || mgr->shutdown || mgr->index == mgr->count) { + // Cancel deadline timer, since we're invoking the on_handshake_done + // callback now. + grpc_timer_cancel(exec_ctx, &mgr->deadline_timer); + grpc_exec_ctx_sched(exec_ctx, &mgr->on_handshake_done, error, NULL); + mgr->shutdown = true; + } else { + grpc_handshaker_do_handshake(exec_ctx, mgr->handshakers[mgr->index], + mgr->acceptor, &mgr->call_next_handshaker, + &mgr->args); } + ++mgr->index; + return mgr->shutdown; } // A function used as the handshaker-done callback when chaining // handshakers together. -static void call_next_handshaker(grpc_exec_ctx* exec_ctx, - grpc_endpoint* endpoint, - grpc_channel_args* args, - grpc_slice_buffer* read_buffer, - void* user_data, grpc_error* error) { - grpc_handshake_manager* mgr = user_data; - GPR_ASSERT(mgr->state != NULL); - GPR_ASSERT(mgr->state->index < mgr->count); - // If we got an error, skip all remaining handshakers and invoke the - // caller-supplied callback immediately. - if (error != GRPC_ERROR_NONE) { - mgr->state->final_cb(exec_ctx, endpoint, args, read_buffer, - mgr->state->final_user_data, error); - return; +static void call_next_handshaker(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_handshake_manager* mgr = arg; + gpr_mu_lock(&mgr->mu); + bool done = call_next_handshaker_locked(exec_ctx, mgr, GRPC_ERROR_REF(error)); + gpr_mu_unlock(&mgr->mu); + // If we're invoked the final callback, we won't be coming back + // to this function, so we can release our reference to the + // handshake manager. + if (done) { + grpc_handshake_manager_unref(exec_ctx, mgr); } - grpc_handshaker_done_cb cb = call_next_handshaker; - // If this is the last handshaker, use the caller-supplied callback - // and user_data instead of chaining back to this function again. - if (mgr->state->index == mgr->count - 1) { - cb = mgr->state->final_cb; - user_data = mgr->state->final_user_data; - } - // Invoke handshaker. - grpc_handshaker_do_handshake( - exec_ctx, mgr->handshakers[mgr->state->index], endpoint, args, - read_buffer, mgr->state->deadline, mgr->state->acceptor, cb, user_data); - ++mgr->state->index; - // If this is the last handshaker, clean up state. - if (mgr->state->index == mgr->count) { - gpr_free(mgr->state); - mgr->state = NULL; +} + +// Callback invoked when deadline is exceeded. +static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + grpc_handshake_manager* mgr = arg; + if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled. + grpc_handshake_manager_shutdown(exec_ctx, mgr); } + grpc_handshake_manager_unref(exec_ctx, mgr); } void grpc_handshake_manager_do_handshake( grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, - grpc_endpoint* endpoint, const grpc_channel_args* args, + grpc_endpoint* endpoint, const grpc_channel_args* channel_args, gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, - grpc_handshaker_done_cb cb, void* user_data) { - grpc_channel_args* args_copy = grpc_channel_args_copy(args); - grpc_slice_buffer* read_buffer = gpr_malloc(sizeof(*read_buffer)); - grpc_slice_buffer_init(read_buffer); - if (mgr->count == 0) { - // No handshakers registered, so we just immediately call the done - // callback with the passed-in endpoint. - cb(exec_ctx, endpoint, args_copy, read_buffer, user_data, GRPC_ERROR_NONE); - } else { - GPR_ASSERT(mgr->state == NULL); - mgr->state = gpr_malloc(sizeof(struct grpc_handshaker_state)); - memset(mgr->state, 0, sizeof(*mgr->state)); - mgr->state->deadline = deadline; - mgr->state->acceptor = acceptor; - mgr->state->final_cb = cb; - mgr->state->final_user_data = user_data; - call_next_handshaker(exec_ctx, endpoint, args_copy, read_buffer, mgr, - GRPC_ERROR_NONE); + grpc_iomgr_cb_func on_handshake_done, void* user_data) { + gpr_mu_lock(&mgr->mu); + GPR_ASSERT(mgr->index == 0); + GPR_ASSERT(!mgr->shutdown); + // Construct handshaker args. These will be passed through all + // handshakers and eventually be freed by the on_handshake_done callback. + mgr->args.endpoint = endpoint; + mgr->args.args = grpc_channel_args_copy(channel_args); + mgr->args.user_data = user_data; + mgr->args.read_buffer = gpr_malloc(sizeof(*mgr->args.read_buffer)); + grpc_slice_buffer_init(mgr->args.read_buffer); + // Initialize state needed for calling handshakers. + mgr->acceptor = acceptor; + grpc_closure_init(&mgr->call_next_handshaker, call_next_handshaker, mgr); + grpc_closure_init(&mgr->on_handshake_done, on_handshake_done, &mgr->args); + // Start deadline timer, which owns a ref. + gpr_ref(&mgr->refs); + grpc_timer_init(exec_ctx, &mgr->deadline_timer, + gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), + on_timeout, mgr, gpr_now(GPR_CLOCK_MONOTONIC)); + // Start first handshaker, which also owns a ref. + gpr_ref(&mgr->refs); + bool done = call_next_handshaker_locked(exec_ctx, mgr, GRPC_ERROR_NONE); + gpr_mu_unlock(&mgr->mu); + if (done) { + grpc_handshake_manager_unref(exec_ctx, mgr); } } diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index f8a36c6473..ebbc1ff7f3 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -54,15 +54,30 @@ typedef struct grpc_handshaker grpc_handshaker; -/// Callback type invoked when a handshaker is done. -/// Takes ownership of \a args and \a read_buffer. -typedef void (*grpc_handshaker_done_cb)(grpc_exec_ctx* exec_ctx, - grpc_endpoint* endpoint, - grpc_channel_args* args, - grpc_slice_buffer* read_buffer, - void* user_data, grpc_error* error); - -struct grpc_handshaker_vtable { +/// Arguments passed through handshakers and to the on_handshake_done callback. +/// +/// For handshakers, all members are input/output parameters; for +/// example, a handshaker may read from or write to \a endpoint and +/// then later replace it with a wrapped endpoint. Similarly, a +/// handshaker may modify \a args. +/// +/// A handshaker takes ownership of the members while a handshake is in +/// progress. Upon failure or shutdown of an in-progress handshaker, +/// the handshaker is responsible for destroying the members and setting +/// them to NULL before invoking the on_handshake_done callback. +/// +/// For the on_handshake_done callback, all members are input arguments, +/// which the callback takes ownership of. +typedef struct { + grpc_endpoint* endpoint; + grpc_channel_args* args; + grpc_slice_buffer* read_buffer; + // User data passed through the handshake manager. Not used by + // individual handshakers. + void* user_data; +} grpc_handshaker_args; + +typedef struct { /// Destroys the handshaker. void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker); @@ -70,44 +85,26 @@ struct grpc_handshaker_vtable { /// aborted in the middle). void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker); - /// Performs handshaking. When finished, calls \a cb with \a user_data. - /// Takes ownership of \a args. - /// Takes ownership of \a read_buffer, which contains leftover bytes read - /// from the endpoint by the previous handshaker. + /// Performs handshaking, modifying \a args as needed (e.g., to + /// replace \a endpoint with a wrapped endpoint). + /// When finished, invokes \a on_handshake_done. /// \a acceptor will be NULL for client-side handshakers. void (*do_handshake)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, - grpc_endpoint* endpoint, grpc_channel_args* args, - grpc_slice_buffer* read_buffer, gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, - grpc_handshaker_done_cb cb, void* user_data); -}; + grpc_closure* on_handshake_done, + grpc_handshaker_args* args); +} grpc_handshaker_vtable; /// Base struct. To subclass, make this the first member of the /// implementation struct. struct grpc_handshaker { - const struct grpc_handshaker_vtable* vtable; + const grpc_handshaker_vtable* vtable; }; /// Called by concrete implementations to initialize the base struct. -void grpc_handshaker_init(const struct grpc_handshaker_vtable* vtable, +void grpc_handshaker_init(const grpc_handshaker_vtable* vtable, grpc_handshaker* handshaker); -/// Convenient wrappers for invoking methods via the vtable. -/// These probably do not need to be called from anywhere but -/// grpc_handshake_manager. -void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx, - grpc_handshaker* handshaker); -void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx, - grpc_handshaker* handshaker); -void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, - grpc_handshaker* handshaker, - grpc_endpoint* endpoint, - grpc_channel_args* args, - grpc_slice_buffer* read_buffer, - gpr_timespec deadline, - grpc_tcp_server_acceptor* acceptor, - grpc_handshaker_done_cb cb, void* user_data); - /// /// grpc_handshake_manager /// @@ -134,15 +131,21 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr); /// Invokes handshakers in the order they were added. -/// Does NOT take ownership of \a args. Instead, makes a copy before +/// Takes ownership of \a endpoint, and then passes that ownership to +/// the \a on_handshake_done callback. +/// Does NOT take ownership of \a channel_args. Instead, makes a copy before /// invoking the first handshaker. /// \a acceptor will be NULL for client-side handshakers. -/// Invokes \a cb with \a user_data after either a handshaker fails or -/// all handshakers have completed successfully. +/// +/// When done, invokes \a on_handshake_done with a grpc_handshaker_args +/// object as its argument. If the callback is invoked with error != +/// GRPC_ERROR_NONE, then handshaking failed and the handshaker has done +/// the necessary clean-up. Otherwise, the callback takes ownership of +/// the arguments. void grpc_handshake_manager_do_handshake( grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, - grpc_endpoint* endpoint, const grpc_channel_args* args, + grpc_endpoint* endpoint, const grpc_channel_args* channel_args, gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, - grpc_handshaker_done_cb cb, void* user_data); + grpc_iomgr_cb_func on_handshake_done, void* user_data); #endif /* GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H */ diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c index 24d264c32a..0ab34d00e4 100644 --- a/src/core/lib/http/httpcli_security_connector.c +++ b/src/core/lib/http/httpcli_security_connector.c @@ -38,7 +38,9 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> -#include "src/core/lib/security/transport/handshake.h" + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/security/transport/security_handshaker.h" #include "src/core/lib/support/string.h" #include "src/core/lib/tsi/ssl_transport_security.h" @@ -58,52 +60,42 @@ static void httpcli_ssl_destroy(grpc_security_connector *sc) { gpr_free(sc); } -static void httpcli_ssl_do_handshake(grpc_exec_ctx *exec_ctx, - grpc_channel_security_connector *sc, - grpc_endpoint *nonsecure_endpoint, - grpc_slice_buffer *read_buffer, - gpr_timespec deadline, - grpc_security_handshake_done_cb cb, - void *user_data) { +static void httpcli_ssl_create_handshakers( + grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, + grpc_handshake_manager *handshake_mgr) { grpc_httpcli_ssl_channel_security_connector *c = (grpc_httpcli_ssl_channel_security_connector *)sc; - tsi_result result = TSI_OK; - tsi_handshaker *handshaker; - if (c->handshaker_factory == NULL) { - gpr_free(read_buffer); - cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL); - return; - } - result = tsi_ssl_handshaker_factory_create_handshaker( - c->handshaker_factory, c->secure_peer_name, &handshaker); - if (result != TSI_OK) { - gpr_log(GPR_ERROR, "Handshaker creation failed with error %s.", - tsi_result_to_string(result)); - gpr_free(read_buffer); - cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL); - } else { - grpc_do_security_handshake(exec_ctx, handshaker, &sc->base, true, - nonsecure_endpoint, read_buffer, deadline, cb, - user_data); + tsi_handshaker *handshaker = NULL; + if (c->handshaker_factory != NULL) { + tsi_result result = tsi_ssl_handshaker_factory_create_handshaker( + c->handshaker_factory, c->secure_peer_name, &handshaker); + if (result != TSI_OK) { + gpr_log(GPR_ERROR, "Handshaker creation failed with error %s.", + tsi_result_to_string(result)); + } } + grpc_security_create_handshakers(exec_ctx, handshaker, &sc->base, + handshake_mgr); } static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc, tsi_peer peer, - grpc_security_peer_check_cb cb, - void *user_data) { + grpc_auth_context **auth_context, + grpc_closure *on_peer_checked) { grpc_httpcli_ssl_channel_security_connector *c = (grpc_httpcli_ssl_channel_security_connector *)sc; - grpc_security_status status = GRPC_SECURITY_OK; + grpc_error *error = GRPC_ERROR_NONE; /* Check the peer name. */ if (c->secure_peer_name != NULL && !tsi_ssl_peer_matches_name(&peer, c->secure_peer_name)) { - gpr_log(GPR_ERROR, "Peer name %s is not in peer certificate", - c->secure_peer_name); - status = GRPC_SECURITY_ERROR; + char *msg; + gpr_asprintf(&msg, "Peer name %s is not in peer certificate", + c->secure_peer_name); + error = GRPC_ERROR_CREATE(msg); + gpr_free(msg); } - cb(exec_ctx, user_data, status, NULL); + grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL); tsi_peer_destruct(&peer); } @@ -140,7 +132,7 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create( *sc = NULL; return GRPC_SECURITY_ERROR; } - c->base.do_handshake = httpcli_ssl_do_handshake; + c->base.create_handshakers = httpcli_ssl_create_handshakers; *sc = &c->base; return GRPC_SECURITY_OK; } @@ -150,19 +142,25 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create( typedef struct { void (*func)(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *endpoint); void *arg; + grpc_handshake_manager *handshake_mgr; } on_done_closure; -static void on_secure_transport_setup_done(grpc_exec_ctx *exec_ctx, void *rp, - grpc_security_status status, - grpc_endpoint *secure_endpoint, - grpc_auth_context *auth_context) { - on_done_closure *c = rp; - if (status != GRPC_SECURITY_OK) { - gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status); +static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_handshaker_args *args = arg; + on_done_closure *c = args->user_data; + if (error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(error); + gpr_log(GPR_ERROR, "Secure transport setup failed: %s", msg); + grpc_error_free_string(msg); c->func(exec_ctx, c->arg, NULL); } else { - c->func(exec_ctx, c->arg, secure_endpoint); + grpc_channel_args_destroy(args->args); + grpc_slice_buffer_destroy(args->read_buffer); + gpr_free(args->read_buffer); + c->func(exec_ctx, c->arg, args->endpoint); } + grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); gpr_free(c); } @@ -183,11 +181,15 @@ static void ssl_handshake(grpc_exec_ctx *exec_ctx, void *arg, } c->func = on_done; c->arg = arg; + c->handshake_mgr = grpc_handshake_manager_create(); GPR_ASSERT(httpcli_ssl_channel_security_connector_create( pem_root_certs, pem_root_certs_size, host, &sc) == GRPC_SECURITY_OK); - grpc_channel_security_connector_do_handshake( - exec_ctx, sc, tcp, NULL, deadline, on_secure_transport_setup_done, c); + grpc_channel_security_connector_create_handshakers(exec_ctx, sc, + c->handshake_mgr); + grpc_handshake_manager_do_handshake( + exec_ctx, c->handshake_mgr, tcp, NULL /* channel_args */, deadline, + NULL /* acceptor */, on_handshake_done, c /* user_data */); GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "httpcli"); } diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index 60ee14eb23..cfc67020ae 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -90,6 +90,12 @@ static bool is_covered_by_poller(grpc_combiner *lock) { gpr_atm_acq_load(&lock->elements_covered_by_poller) > 0; } +#define IS_COVERED_BY_POLLER_FMT "(final=%d elems=%" PRIdPTR ")->%d" +#define IS_COVERED_BY_POLLER_ARGS(lock) \ + (lock)->final_list_covered_by_poller, \ + gpr_atm_acq_load(&(lock)->elements_covered_by_poller), \ + is_covered_by_poller((lock)) + grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { grpc_combiner *lock = gpr_malloc(sizeof(*lock)); lock->next_combiner_on_this_exec_ctx = NULL; @@ -197,9 +203,10 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { GRPC_COMBINER_TRACE( gpr_log(GPR_DEBUG, "C:%p grpc_combiner_continue_exec_ctx workqueue=%p " - "is_covered_by_poller=%d exec_ctx_ready_to_finish=%d " + "is_covered_by_poller=" IS_COVERED_BY_POLLER_FMT + " exec_ctx_ready_to_finish=%d " "time_to_execute_final_list=%d", - lock, lock->optional_workqueue, is_covered_by_poller(lock), + lock, lock->optional_workqueue, IS_COVERED_BY_POLLER_ARGS(lock), grpc_exec_ctx_ready_to_finish(exec_ctx), lock->time_to_execute_final_list)); diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index 379bf9bd23..213d29600c 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -144,6 +144,12 @@ struct grpc_resource_quota { /* Closure around rq_reclamation_done */ grpc_closure rq_reclamation_done_closure; + /* This is only really usable for debugging: it's always a stale pointer, but + a stale pointer that might just be fresh enough to guide us to where the + reclamation system is stuck */ + grpc_closure *debug_only_last_initiated_reclaimer; + grpc_resource_user *debug_only_last_reclaimer_resource_user; + /* Roots of all resource user lists */ grpc_resource_user *roots[GRPC_RULIST_COUNT]; @@ -225,6 +231,7 @@ static void rulist_remove(grpc_resource_user *resource_user, grpc_rulist list) { resource_user->links[list].prev; resource_user->links[list].prev->links[list].next = resource_user->links[list].next; + resource_user->links[list].next = resource_user->links[list].prev = NULL; } /******************************************************************************* @@ -340,6 +347,9 @@ static bool rq_reclaim(grpc_exec_ctx *exec_ctx, resource_quota->reclaiming = true; grpc_resource_quota_internal_ref(resource_quota); grpc_closure *c = resource_user->reclaimers[destructive]; + GPR_ASSERT(c); + resource_quota->debug_only_last_reclaimer_resource_user = resource_user; + resource_quota->debug_only_last_initiated_reclaimer = c; resource_user->reclaimers[destructive] = NULL; grpc_closure_run(exec_ctx, c, GRPC_ERROR_NONE); return true; @@ -476,6 +486,8 @@ static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { GRPC_ERROR_CANCELLED, NULL); resource_user->reclaimers[0] = NULL; resource_user->reclaimers[1] = NULL; + rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN); + rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE); } static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index fd0c7a0f9d..3c24ea9afa 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -388,7 +388,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, /* Try listening on IPv6 first. */ addr = &wild6; // TODO(rjshade): Test and propagate the returned grpc_error*: - grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd); + GRPC_ERROR_UNREF(grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, + &dsmode, &fd)); allocated_port1 = add_socket_to_server(s, fd, addr, read_cb, orphan_cb); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { goto done; @@ -402,7 +403,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, } // TODO(rjshade): Test and propagate the returned grpc_error*: - grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd); + GRPC_ERROR_UNREF(grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, + &dsmode, &fd)); if (fd < 0) { gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); } diff --git a/src/core/lib/security/transport/handshake.c b/src/core/lib/security/transport/handshake.c deleted file mode 100644 index 9623797610..0000000000 --- a/src/core/lib/security/transport/handshake.c +++ /dev/null @@ -1,374 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/lib/security/transport/handshake.h" - -#include <stdbool.h> -#include <string.h> - -#include <grpc/slice_buffer.h> -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include "src/core/lib/iomgr/timer.h" -#include "src/core/lib/security/context/security_context.h" -#include "src/core/lib/security/transport/secure_endpoint.h" -#include "src/core/lib/security/transport/tsi_error.h" - -#define GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE 256 - -typedef struct { - grpc_security_connector *connector; - tsi_handshaker *handshaker; - bool is_client_side; - unsigned char *handshake_buffer; - size_t handshake_buffer_size; - grpc_endpoint *wrapped_endpoint; - grpc_endpoint *secure_endpoint; - grpc_slice_buffer left_overs; - grpc_slice_buffer incoming; - grpc_slice_buffer outgoing; - grpc_security_handshake_done_cb cb; - void *user_data; - grpc_closure on_handshake_data_sent_to_peer; - grpc_closure on_handshake_data_received_from_peer; - grpc_auth_context *auth_context; - grpc_timer timer; - gpr_refcount refs; -} grpc_security_handshake; - -static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, - void *setup, - grpc_error *error); - -static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *setup, - grpc_error *error); - -static void security_connector_remove_handshake(grpc_security_handshake *h) { - GPR_ASSERT(!h->is_client_side); - grpc_security_connector_handshake_list *node; - grpc_security_connector_handshake_list *tmp; - grpc_server_security_connector *sc = - (grpc_server_security_connector *)h->connector; - gpr_mu_lock(&sc->mu); - node = sc->handshaking_handshakes; - if (node && node->handshake == h) { - sc->handshaking_handshakes = node->next; - gpr_free(node); - gpr_mu_unlock(&sc->mu); - return; - } - while (node) { - if (node->next->handshake == h) { - tmp = node->next; - node->next = node->next->next; - gpr_free(tmp); - gpr_mu_unlock(&sc->mu); - return; - } - node = node->next; - } - gpr_mu_unlock(&sc->mu); -} - -static void unref_handshake(grpc_security_handshake *h) { - if (gpr_unref(&h->refs)) { - if (h->handshaker != NULL) tsi_handshaker_destroy(h->handshaker); - if (h->handshake_buffer != NULL) gpr_free(h->handshake_buffer); - grpc_slice_buffer_destroy(&h->left_overs); - grpc_slice_buffer_destroy(&h->outgoing); - grpc_slice_buffer_destroy(&h->incoming); - GRPC_AUTH_CONTEXT_UNREF(h->auth_context, "handshake"); - GRPC_SECURITY_CONNECTOR_UNREF(h->connector, "handshake"); - gpr_free(h); - } -} - -static void security_handshake_done(grpc_exec_ctx *exec_ctx, - grpc_security_handshake *h, - grpc_error *error) { - grpc_timer_cancel(exec_ctx, &h->timer); - if (!h->is_client_side) { - security_connector_remove_handshake(h); - } - if (error == GRPC_ERROR_NONE) { - h->cb(exec_ctx, h->user_data, GRPC_SECURITY_OK, h->secure_endpoint, - h->auth_context); - } else { - const char *msg = grpc_error_string(error); - gpr_log(GPR_DEBUG, "Security handshake failed: %s", msg); - grpc_error_free_string(msg); - - if (h->secure_endpoint != NULL) { - grpc_endpoint_shutdown(exec_ctx, h->secure_endpoint); - grpc_endpoint_destroy(exec_ctx, h->secure_endpoint); - } else { - grpc_endpoint_destroy(exec_ctx, h->wrapped_endpoint); - } - h->cb(exec_ctx, h->user_data, GRPC_SECURITY_ERROR, NULL, NULL); - } - unref_handshake(h); - GRPC_ERROR_UNREF(error); -} - -static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *user_data, - grpc_security_status status, - grpc_auth_context *auth_context) { - grpc_security_handshake *h = user_data; - tsi_frame_protector *protector; - tsi_result result; - if (status != GRPC_SECURITY_OK) { - security_handshake_done( - exec_ctx, h, - grpc_error_set_int(GRPC_ERROR_CREATE("Error checking peer."), - GRPC_ERROR_INT_SECURITY_STATUS, status)); - return; - } - h->auth_context = GRPC_AUTH_CONTEXT_REF(auth_context, "handshake"); - result = - tsi_handshaker_create_frame_protector(h->handshaker, NULL, &protector); - if (result != TSI_OK) { - security_handshake_done( - exec_ctx, h, - grpc_set_tsi_error_result( - GRPC_ERROR_CREATE("Frame protector creation failed"), result)); - return; - } - h->secure_endpoint = - grpc_secure_endpoint_create(protector, h->wrapped_endpoint, - h->left_overs.slices, h->left_overs.count); - h->left_overs.count = 0; - h->left_overs.length = 0; - security_handshake_done(exec_ctx, h, GRPC_ERROR_NONE); - return; -} - -static void check_peer(grpc_exec_ctx *exec_ctx, grpc_security_handshake *h) { - tsi_peer peer; - tsi_result result = tsi_handshaker_extract_peer(h->handshaker, &peer); - - if (result != TSI_OK) { - security_handshake_done( - exec_ctx, h, grpc_set_tsi_error_result( - GRPC_ERROR_CREATE("Peer extraction failed"), result)); - return; - } - grpc_security_connector_check_peer(exec_ctx, h->connector, peer, - on_peer_checked, h); -} - -static void send_handshake_bytes_to_peer(grpc_exec_ctx *exec_ctx, - grpc_security_handshake *h) { - size_t offset = 0; - tsi_result result = TSI_OK; - grpc_slice to_send; - - do { - size_t to_send_size = h->handshake_buffer_size - offset; - result = tsi_handshaker_get_bytes_to_send_to_peer( - h->handshaker, h->handshake_buffer + offset, &to_send_size); - offset += to_send_size; - if (result == TSI_INCOMPLETE_DATA) { - h->handshake_buffer_size *= 2; - h->handshake_buffer = - gpr_realloc(h->handshake_buffer, h->handshake_buffer_size); - } - } while (result == TSI_INCOMPLETE_DATA); - - if (result != TSI_OK) { - security_handshake_done(exec_ctx, h, - grpc_set_tsi_error_result( - GRPC_ERROR_CREATE("Handshake failed"), result)); - return; - } - - to_send = - grpc_slice_from_copied_buffer((const char *)h->handshake_buffer, offset); - grpc_slice_buffer_reset_and_unref(&h->outgoing); - grpc_slice_buffer_add(&h->outgoing, to_send); - /* TODO(klempner,jboeuf): This should probably use the client setup - deadline */ - grpc_endpoint_write(exec_ctx, h->wrapped_endpoint, &h->outgoing, - &h->on_handshake_data_sent_to_peer); -} - -static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, - void *handshake, - grpc_error *error) { - grpc_security_handshake *h = handshake; - size_t consumed_slice_size = 0; - tsi_result result = TSI_OK; - size_t i; - size_t num_left_overs; - int has_left_overs_in_current_slice = 0; - - if (error != GRPC_ERROR_NONE) { - security_handshake_done( - exec_ctx, h, - GRPC_ERROR_CREATE_REFERENCING("Handshake read failed", &error, 1)); - return; - } - - for (i = 0; i < h->incoming.count; i++) { - consumed_slice_size = GRPC_SLICE_LENGTH(h->incoming.slices[i]); - result = tsi_handshaker_process_bytes_from_peer( - h->handshaker, GRPC_SLICE_START_PTR(h->incoming.slices[i]), - &consumed_slice_size); - if (!tsi_handshaker_is_in_progress(h->handshaker)) break; - } - - if (tsi_handshaker_is_in_progress(h->handshaker)) { - /* We may need more data. */ - if (result == TSI_INCOMPLETE_DATA) { - grpc_endpoint_read(exec_ctx, h->wrapped_endpoint, &h->incoming, - &h->on_handshake_data_received_from_peer); - return; - } else { - send_handshake_bytes_to_peer(exec_ctx, h); - return; - } - } - - if (result != TSI_OK) { - security_handshake_done(exec_ctx, h, - grpc_set_tsi_error_result( - GRPC_ERROR_CREATE("Handshake failed"), result)); - return; - } - - /* Handshake is done and successful this point. */ - has_left_overs_in_current_slice = - (consumed_slice_size < GRPC_SLICE_LENGTH(h->incoming.slices[i])); - num_left_overs = - (has_left_overs_in_current_slice ? 1 : 0) + h->incoming.count - i - 1; - if (num_left_overs == 0) { - check_peer(exec_ctx, h); - return; - } - - /* Put the leftovers in our buffer (ownership transfered). */ - if (has_left_overs_in_current_slice) { - grpc_slice_buffer_add( - &h->left_overs, - grpc_slice_split_tail(&h->incoming.slices[i], consumed_slice_size)); - grpc_slice_unref( - h->incoming.slices[i]); /* split_tail above increments refcount. */ - } - grpc_slice_buffer_addn( - &h->left_overs, &h->incoming.slices[i + 1], - num_left_overs - (size_t)has_left_overs_in_current_slice); - check_peer(exec_ctx, h); -} - -/* If handshake is NULL, the handshake is done. */ -static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, - void *handshake, grpc_error *error) { - grpc_security_handshake *h = handshake; - - /* Make sure that write is OK. */ - if (error != GRPC_ERROR_NONE) { - if (handshake != NULL) - security_handshake_done( - exec_ctx, h, - GRPC_ERROR_CREATE_REFERENCING("Handshake write failed", &error, 1)); - return; - } - - /* We may be done. */ - if (tsi_handshaker_is_in_progress(h->handshaker)) { - /* TODO(klempner,jboeuf): This should probably use the client setup - deadline */ - grpc_endpoint_read(exec_ctx, h->wrapped_endpoint, &h->incoming, - &h->on_handshake_data_received_from_peer); - } else { - check_peer(exec_ctx, h); - } -} - -static void on_timeout(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_security_handshake *h = arg; - if (error == GRPC_ERROR_NONE) { - grpc_endpoint_shutdown(exec_ctx, h->wrapped_endpoint); - } - unref_handshake(h); -} - -void grpc_do_security_handshake( - grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker, - grpc_security_connector *connector, bool is_client_side, - grpc_endpoint *nonsecure_endpoint, grpc_slice_buffer *read_buffer, - gpr_timespec deadline, grpc_security_handshake_done_cb cb, - void *user_data) { - grpc_security_connector_handshake_list *handshake_node; - grpc_security_handshake *h = gpr_malloc(sizeof(grpc_security_handshake)); - memset(h, 0, sizeof(grpc_security_handshake)); - h->handshaker = handshaker; - h->connector = GRPC_SECURITY_CONNECTOR_REF(connector, "handshake"); - h->is_client_side = is_client_side; - h->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE; - h->handshake_buffer = gpr_malloc(h->handshake_buffer_size); - h->wrapped_endpoint = nonsecure_endpoint; - h->user_data = user_data; - h->cb = cb; - gpr_ref_init(&h->refs, 2); /* timer and handshake proper each get a ref */ - grpc_closure_init(&h->on_handshake_data_sent_to_peer, - on_handshake_data_sent_to_peer, h); - grpc_closure_init(&h->on_handshake_data_received_from_peer, - on_handshake_data_received_from_peer, h); - grpc_slice_buffer_init(&h->left_overs); - grpc_slice_buffer_init(&h->outgoing); - grpc_slice_buffer_init(&h->incoming); - if (read_buffer != NULL) { - grpc_slice_buffer_move_into(read_buffer, &h->incoming); - gpr_free(read_buffer); - } - if (!is_client_side) { - grpc_server_security_connector *server_connector = - (grpc_server_security_connector *)connector; - handshake_node = gpr_malloc(sizeof(grpc_security_connector_handshake_list)); - handshake_node->handshake = h; - gpr_mu_lock(&server_connector->mu); - handshake_node->next = server_connector->handshaking_handshakes; - server_connector->handshaking_handshakes = handshake_node; - gpr_mu_unlock(&server_connector->mu); - } - send_handshake_bytes_to_peer(exec_ctx, h); - grpc_timer_init(exec_ctx, &h->timer, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - on_timeout, h, gpr_now(GPR_CLOCK_MONOTONIC)); -} - -void grpc_security_handshake_shutdown(grpc_exec_ctx *exec_ctx, - void *handshake) { - grpc_security_handshake *h = handshake; - grpc_endpoint_shutdown(exec_ctx, h->wrapped_endpoint); -} diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c index 0fbd63a7e1..a2e0c7c7c7 100644 --- a/src/core/lib/security/transport/security_connector.c +++ b/src/core/lib/security/transport/security_connector.c @@ -46,8 +46,8 @@ #include "src/core/lib/iomgr/load_file.h" #include "src/core/lib/security/context/security_context.h" #include "src/core/lib/security/credentials/credentials.h" -#include "src/core/lib/security/transport/handshake.h" #include "src/core/lib/security/transport/secure_endpoint.h" +#include "src/core/lib/security/transport/security_handshaker.h" #include "src/core/lib/support/env.h" #include "src/core/lib/support/string.h" #include "src/core/lib/tsi/fake_transport_security.h" @@ -111,58 +111,34 @@ const tsi_peer_property *tsi_peer_get_property_by_name(const tsi_peer *peer, return NULL; } -void grpc_server_security_connector_shutdown( - grpc_exec_ctx *exec_ctx, grpc_server_security_connector *connector) { - grpc_security_connector_handshake_list *tmp; - gpr_mu_lock(&connector->mu); - while (connector->handshaking_handshakes) { - tmp = connector->handshaking_handshakes; - grpc_security_handshake_shutdown( - exec_ctx, connector->handshaking_handshakes->handshake); - connector->handshaking_handshakes = tmp->next; - gpr_free(tmp); +void grpc_channel_security_connector_create_handshakers( + grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *connector, + grpc_handshake_manager *handshake_mgr) { + if (connector != NULL) { + connector->create_handshakers(exec_ctx, connector, handshake_mgr); } - gpr_mu_unlock(&connector->mu); } -void grpc_channel_security_connector_do_handshake( - grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, - grpc_endpoint *nonsecure_endpoint, grpc_slice_buffer *read_buffer, - gpr_timespec deadline, grpc_security_handshake_done_cb cb, - void *user_data) { - if (sc == NULL || nonsecure_endpoint == NULL) { - gpr_free(read_buffer); - cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL); - } else { - sc->do_handshake(exec_ctx, sc, nonsecure_endpoint, read_buffer, deadline, - cb, user_data); - } -} - -void grpc_server_security_connector_do_handshake( - grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc, - grpc_tcp_server_acceptor *acceptor, grpc_endpoint *nonsecure_endpoint, - grpc_slice_buffer *read_buffer, gpr_timespec deadline, - grpc_security_handshake_done_cb cb, void *user_data) { - if (sc == NULL || nonsecure_endpoint == NULL) { - gpr_free(read_buffer); - cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL); - } else { - sc->do_handshake(exec_ctx, sc, acceptor, nonsecure_endpoint, read_buffer, - deadline, cb, user_data); +void grpc_server_security_connector_create_handshakers( + grpc_exec_ctx *exec_ctx, grpc_server_security_connector *connector, + grpc_handshake_manager *handshake_mgr) { + if (connector != NULL) { + connector->create_handshakers(exec_ctx, connector, handshake_mgr); } } void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc, tsi_peer peer, - grpc_security_peer_check_cb cb, - void *user_data) { + grpc_auth_context **auth_context, + grpc_closure *on_peer_checked) { if (sc == NULL) { - cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL); + grpc_exec_ctx_sched( + exec_ctx, on_peer_checked, + GRPC_ERROR_CREATE("cannot check peer -- no security connector"), NULL); tsi_peer_destruct(&peer); } else { - sc->vtable->check_peer(exec_ctx, sc, peer, cb, user_data); + sc->vtable->check_peer(exec_ctx, sc, peer, auth_context, on_peer_checked); } } @@ -262,45 +238,41 @@ static void fake_channel_destroy(grpc_security_connector *sc) { gpr_free(sc); } -static void fake_server_destroy(grpc_security_connector *sc) { - grpc_server_security_connector *c = (grpc_server_security_connector *)sc; - gpr_mu_destroy(&c->mu); - gpr_free(sc); -} +static void fake_server_destroy(grpc_security_connector *sc) { gpr_free(sc); } static void fake_check_peer(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc, tsi_peer peer, - grpc_security_peer_check_cb cb, void *user_data) { + grpc_auth_context **auth_context, + grpc_closure *on_peer_checked) { const char *prop_name; - grpc_security_status status = GRPC_SECURITY_OK; - grpc_auth_context *auth_context = NULL; + grpc_error *error = GRPC_ERROR_NONE; + *auth_context = NULL; if (peer.property_count != 1) { - gpr_log(GPR_ERROR, "Fake peers should only have 1 property."); - status = GRPC_SECURITY_ERROR; + error = GRPC_ERROR_CREATE("Fake peers should only have 1 property."); goto end; } prop_name = peer.properties[0].name; if (prop_name == NULL || strcmp(prop_name, TSI_CERTIFICATE_TYPE_PEER_PROPERTY)) { - gpr_log(GPR_ERROR, "Unexpected property in fake peer: %s.", - prop_name == NULL ? "<EMPTY>" : prop_name); - status = GRPC_SECURITY_ERROR; + char *msg; + gpr_asprintf(&msg, "Unexpected property in fake peer: %s.", + prop_name == NULL ? "<EMPTY>" : prop_name); + error = GRPC_ERROR_CREATE(msg); + gpr_free(msg); goto end; } if (strncmp(peer.properties[0].value.data, TSI_FAKE_CERTIFICATE_TYPE, peer.properties[0].value.length)) { - gpr_log(GPR_ERROR, "Invalid value for cert type property."); - status = GRPC_SECURITY_ERROR; + error = GRPC_ERROR_CREATE("Invalid value for cert type property."); goto end; } - auth_context = grpc_auth_context_create(NULL); + *auth_context = grpc_auth_context_create(NULL); grpc_auth_context_add_cstring_property( - auth_context, GRPC_TRANSPORT_SECURITY_TYPE_PROPERTY_NAME, + *auth_context, GRPC_TRANSPORT_SECURITY_TYPE_PROPERTY_NAME, GRPC_FAKE_TRANSPORT_SECURITY_TYPE); end: - cb(exec_ctx, user_data, status, auth_context); - grpc_auth_context_unref(auth_context); + grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL); tsi_peer_destruct(&peer); } @@ -313,26 +285,20 @@ static void fake_channel_check_call_host(grpc_exec_ctx *exec_ctx, cb(exec_ctx, user_data, GRPC_SECURITY_OK); } -static void fake_channel_do_handshake(grpc_exec_ctx *exec_ctx, - grpc_channel_security_connector *sc, - grpc_endpoint *nonsecure_endpoint, - grpc_slice_buffer *read_buffer, - gpr_timespec deadline, - grpc_security_handshake_done_cb cb, - void *user_data) { - grpc_do_security_handshake(exec_ctx, tsi_create_fake_handshaker(1), &sc->base, - true, nonsecure_endpoint, read_buffer, deadline, - cb, user_data); +static void fake_channel_create_handshakers( + grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, + grpc_handshake_manager *handshake_mgr) { + grpc_security_create_handshakers( + exec_ctx, tsi_create_fake_handshaker(true /* is_client */), &sc->base, + handshake_mgr); } -static void fake_server_do_handshake( +static void fake_server_create_handshakers( grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc, - grpc_tcp_server_acceptor *acceptor, grpc_endpoint *nonsecure_endpoint, - grpc_slice_buffer *read_buffer, gpr_timespec deadline, - grpc_security_handshake_done_cb cb, void *user_data) { - grpc_do_security_handshake(exec_ctx, tsi_create_fake_handshaker(0), &sc->base, - false, nonsecure_endpoint, read_buffer, deadline, - cb, user_data); + grpc_handshake_manager *handshake_mgr) { + grpc_security_create_handshakers( + exec_ctx, tsi_create_fake_handshaker(false /* is_client */), &sc->base, + handshake_mgr); } static grpc_security_connector_vtable fake_channel_vtable = { @@ -350,7 +316,7 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create( c->base.vtable = &fake_channel_vtable; c->request_metadata_creds = grpc_call_credentials_ref(request_metadata_creds); c->check_call_host = fake_channel_check_call_host; - c->do_handshake = fake_channel_do_handshake; + c->create_handshakers = fake_channel_create_handshakers; return c; } @@ -362,8 +328,7 @@ grpc_server_security_connector *grpc_fake_server_security_connector_create( gpr_ref_init(&c->base.refcount, 1); c->base.vtable = &fake_server_vtable; c->base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME; - c->do_handshake = fake_server_do_handshake; - gpr_mu_init(&c->mu); + c->create_handshakers = fake_server_create_handshakers; return c; } @@ -396,11 +361,9 @@ static void ssl_channel_destroy(grpc_security_connector *sc) { static void ssl_server_destroy(grpc_security_connector *sc) { grpc_ssl_server_security_connector *c = (grpc_ssl_server_security_connector *)sc; - if (c->handshaker_factory != NULL) { tsi_ssl_handshaker_factory_destroy(c->handshaker_factory); } - gpr_mu_destroy(&c->base.mu); gpr_free(sc); } @@ -419,49 +382,33 @@ static grpc_security_status ssl_create_handshaker( return GRPC_SECURITY_OK; } -static void ssl_channel_do_handshake(grpc_exec_ctx *exec_ctx, - grpc_channel_security_connector *sc, - grpc_endpoint *nonsecure_endpoint, - grpc_slice_buffer *read_buffer, - gpr_timespec deadline, - grpc_security_handshake_done_cb cb, - void *user_data) { +static void ssl_channel_create_handshakers( + grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, + grpc_handshake_manager *handshake_mgr) { grpc_ssl_channel_security_connector *c = (grpc_ssl_channel_security_connector *)sc; - tsi_handshaker *handshaker; - grpc_security_status status = ssl_create_handshaker( - c->handshaker_factory, true, - c->overridden_target_name != NULL ? c->overridden_target_name - : c->target_name, - &handshaker); - if (status != GRPC_SECURITY_OK) { - gpr_free(read_buffer); - cb(exec_ctx, user_data, status, NULL, NULL); - } else { - grpc_do_security_handshake(exec_ctx, handshaker, &sc->base, true, - nonsecure_endpoint, read_buffer, deadline, cb, - user_data); - } -} - -static void ssl_server_do_handshake( + // Instantiate TSI handshaker. + tsi_handshaker *tsi_hs = NULL; + ssl_create_handshaker(c->handshaker_factory, true /* is_client */, + c->overridden_target_name != NULL + ? c->overridden_target_name + : c->target_name, + &tsi_hs); + // Create handshakers. + grpc_security_create_handshakers(exec_ctx, tsi_hs, &sc->base, handshake_mgr); +} + +static void ssl_server_create_handshakers( grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc, - grpc_tcp_server_acceptor *acceptor, grpc_endpoint *nonsecure_endpoint, - grpc_slice_buffer *read_buffer, gpr_timespec deadline, - grpc_security_handshake_done_cb cb, void *user_data) { + grpc_handshake_manager *handshake_mgr) { grpc_ssl_server_security_connector *c = (grpc_ssl_server_security_connector *)sc; - tsi_handshaker *handshaker; - grpc_security_status status = - ssl_create_handshaker(c->handshaker_factory, false, NULL, &handshaker); - if (status != GRPC_SECURITY_OK) { - gpr_free(read_buffer); - cb(exec_ctx, user_data, status, NULL, NULL); - } else { - grpc_do_security_handshake(exec_ctx, handshaker, &sc->base, false, - nonsecure_endpoint, read_buffer, deadline, cb, - user_data); - } + // Instantiate TSI handshaker. + tsi_handshaker *tsi_hs = NULL; + ssl_create_handshaker(c->handshaker_factory, false /* is_client */, + NULL /* peer_name */, &tsi_hs); + // Create handshakers. + grpc_security_create_handshakers(exec_ctx, tsi_hs, &sc->base, handshake_mgr); } static int ssl_host_matches_name(const tsi_peer *peer, const char *peer_name) { @@ -518,57 +465,53 @@ grpc_auth_context *tsi_ssl_peer_to_auth_context(const tsi_peer *peer) { return ctx; } -static grpc_security_status ssl_check_peer(grpc_security_connector *sc, - const char *peer_name, - const tsi_peer *peer, - grpc_auth_context **auth_context) { +static grpc_error *ssl_check_peer(grpc_security_connector *sc, + const char *peer_name, const tsi_peer *peer, + grpc_auth_context **auth_context) { /* Check the ALPN. */ const tsi_peer_property *p = tsi_peer_get_property_by_name(peer, TSI_SSL_ALPN_SELECTED_PROTOCOL); if (p == NULL) { - gpr_log(GPR_ERROR, "Missing selected ALPN property."); - return GRPC_SECURITY_ERROR; + return GRPC_ERROR_CREATE( + "Cannot check peer: missing selected ALPN property."); } if (!grpc_chttp2_is_alpn_version_supported(p->value.data, p->value.length)) { - gpr_log(GPR_ERROR, "Invalid ALPN value."); - return GRPC_SECURITY_ERROR; + return GRPC_ERROR_CREATE("Cannot check peer: invalid ALPN value."); } /* Check the peer name if specified. */ if (peer_name != NULL && !ssl_host_matches_name(peer, peer_name)) { - gpr_log(GPR_ERROR, "Peer name %s is not in peer certificate", peer_name); - return GRPC_SECURITY_ERROR; + char *msg; + gpr_asprintf(&msg, "Peer name %s is not in peer certificate", peer_name); + grpc_error *error = GRPC_ERROR_CREATE(msg); + gpr_free(msg); + return error; } *auth_context = tsi_ssl_peer_to_auth_context(peer); - return GRPC_SECURITY_OK; + return GRPC_ERROR_NONE; } static void ssl_channel_check_peer(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc, tsi_peer peer, - grpc_security_peer_check_cb cb, - void *user_data) { + grpc_auth_context **auth_context, + grpc_closure *on_peer_checked) { grpc_ssl_channel_security_connector *c = (grpc_ssl_channel_security_connector *)sc; - grpc_security_status status; - grpc_auth_context *auth_context = NULL; - status = ssl_check_peer(sc, c->overridden_target_name != NULL - ? c->overridden_target_name - : c->target_name, - &peer, &auth_context); - cb(exec_ctx, user_data, status, auth_context); - grpc_auth_context_unref(auth_context); + grpc_error *error = ssl_check_peer(sc, c->overridden_target_name != NULL + ? c->overridden_target_name + : c->target_name, + &peer, auth_context); + grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL); tsi_peer_destruct(&peer); } static void ssl_server_check_peer(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc, tsi_peer peer, - grpc_security_peer_check_cb cb, - void *user_data) { - grpc_auth_context *auth_context = NULL; - grpc_security_status status = ssl_check_peer(sc, NULL, &peer, &auth_context); + grpc_auth_context **auth_context, + grpc_closure *on_peer_checked) { + grpc_error *error = ssl_check_peer(sc, NULL, &peer, auth_context); tsi_peer_destruct(&peer); - cb(exec_ctx, user_data, status, auth_context); - grpc_auth_context_unref(auth_context); + grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL); } static void add_shallow_auth_property_to_peer(tsi_peer *peer, @@ -765,7 +708,7 @@ grpc_security_status grpc_ssl_channel_security_connector_create( c->base.request_metadata_creds = grpc_call_credentials_ref(request_metadata_creds); c->base.check_call_host = ssl_channel_check_call_host; - c->base.do_handshake = ssl_channel_do_handshake; + c->base.create_handshakers = ssl_channel_create_handshakers; gpr_split_host_port(target_name, &c->target_name, &port); gpr_free(port); if (overridden_target_name != NULL) { @@ -840,8 +783,7 @@ grpc_security_status grpc_ssl_server_security_connector_create( *sc = NULL; goto error; } - gpr_mu_init(&c->base.mu); - c->base.do_handshake = ssl_server_do_handshake; + c->base.create_handshakers = ssl_server_create_handshakers; *sc = &c->base; gpr_free((void *)alpn_protocol_strings); gpr_free(alpn_protocol_string_lengths); diff --git a/src/core/lib/security/transport/security_connector.h b/src/core/lib/security/transport/security_connector.h index dc02692b01..696db0e02e 100644 --- a/src/core/lib/security/transport/security_connector.h +++ b/src/core/lib/security/transport/security_connector.h @@ -35,6 +35,8 @@ #define GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_CONNECTOR_H #include <grpc/grpc_security.h> + +#include "src/core/lib/channel/handshaker.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/tcp_server.h" #include "src/core/lib/tsi/transport_security_interface.h" @@ -57,21 +59,11 @@ typedef struct grpc_security_connector grpc_security_connector; #define GRPC_SECURITY_CONNECTOR_ARG "grpc.security_connector" -typedef void (*grpc_security_peer_check_cb)(grpc_exec_ctx *exec_ctx, - void *user_data, - grpc_security_status status, - grpc_auth_context *auth_context); - -/* Ownership of the secure_endpoint is transfered. */ -typedef void (*grpc_security_handshake_done_cb)( - grpc_exec_ctx *exec_ctx, void *user_data, grpc_security_status status, - grpc_endpoint *secure_endpoint, grpc_auth_context *auth_context); - typedef struct { void (*destroy)(grpc_security_connector *sc); void (*check_peer)(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc, - tsi_peer peer, grpc_security_peer_check_cb cb, - void *user_data); + tsi_peer peer, grpc_auth_context **auth_context, + grpc_closure *on_peer_checked); } grpc_security_connector_vtable; typedef struct grpc_security_connector_handshake_list { @@ -106,12 +98,12 @@ void grpc_security_connector_unref(grpc_security_connector *policy); #endif /* Check the peer. Callee takes ownership of the peer object. - The callback will include the resulting auth_context. */ + Sets *auth_context and invokes on_peer_checked when done. */ void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc, tsi_peer peer, - grpc_security_peer_check_cb cb, - void *user_data); + grpc_auth_context **auth_context, + grpc_closure *on_peer_checked); /* Util to encapsulate the connector in a channel arg. */ grpc_arg grpc_security_connector_to_arg(grpc_security_connector *sc); @@ -141,11 +133,9 @@ struct grpc_channel_security_connector { grpc_channel_security_connector *sc, const char *host, grpc_auth_context *auth_context, grpc_security_call_host_check_cb cb, void *user_data); - void (*do_handshake)(grpc_exec_ctx *exec_ctx, - grpc_channel_security_connector *sc, - grpc_endpoint *nonsecure_endpoint, - grpc_slice_buffer *read_buffer, gpr_timespec deadline, - grpc_security_handshake_done_cb cb, void *user_data); + void (*create_handshakers)(grpc_exec_ctx *exec_ctx, + grpc_channel_security_connector *sc, + grpc_handshake_manager *handshake_mgr); }; /* Checks that the host that will be set for a call is acceptable. */ @@ -154,11 +144,10 @@ void grpc_channel_security_connector_check_call_host( const char *host, grpc_auth_context *auth_context, grpc_security_call_host_check_cb cb, void *user_data); -/* Handshake. */ -void grpc_channel_security_connector_do_handshake( +/* Registers handshakers with \a handshake_mgr. */ +void grpc_channel_security_connector_create_handshakers( grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *connector, - grpc_endpoint *nonsecure_endpoint, grpc_slice_buffer *read_buffer, - gpr_timespec deadline, grpc_security_handshake_done_cb cb, void *user_data); + grpc_handshake_manager *handshake_mgr); /* --- server_security_connector object. --- @@ -169,25 +158,14 @@ typedef struct grpc_server_security_connector grpc_server_security_connector; struct grpc_server_security_connector { grpc_security_connector base; - gpr_mu mu; - grpc_security_connector_handshake_list *handshaking_handshakes; - const grpc_channel_args *channel_args; - void (*do_handshake)(grpc_exec_ctx *exec_ctx, - grpc_server_security_connector *sc, - grpc_tcp_server_acceptor *acceptor, - grpc_endpoint *nonsecure_endpoint, - grpc_slice_buffer *read_buffer, gpr_timespec deadline, - grpc_security_handshake_done_cb cb, void *user_data); + void (*create_handshakers)(grpc_exec_ctx *exec_ctx, + grpc_server_security_connector *sc, + grpc_handshake_manager *handshake_mgr); }; -void grpc_server_security_connector_do_handshake( +void grpc_server_security_connector_create_handshakers( grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc, - grpc_tcp_server_acceptor *acceptor, grpc_endpoint *nonsecure_endpoint, - grpc_slice_buffer *read_buffer, gpr_timespec deadline, - grpc_security_handshake_done_cb cb, void *user_data); - -void grpc_server_security_connector_shutdown( - grpc_exec_ctx *exec_ctx, grpc_server_security_connector *connector); + grpc_handshake_manager *handshake_mgr); /* --- Creation security connectors. --- */ diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c new file mode 100644 index 0000000000..fc01bec2f2 --- /dev/null +++ b/src/core/lib/security/transport/security_handshaker.c @@ -0,0 +1,450 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/security/transport/security_handshaker.h" + +#include <stdbool.h> +#include <string.h> + +#include <grpc/slice_buffer.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/security/context/security_context.h" +#include "src/core/lib/security/transport/secure_endpoint.h" +#include "src/core/lib/security/transport/tsi_error.h" + +#define GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE 256 + +typedef struct { + grpc_handshaker base; + + // State set at creation time. + tsi_handshaker *handshaker; + grpc_security_connector *connector; + + gpr_mu mu; + gpr_refcount refs; + + bool shutdown; + // Endpoint and read buffer to destroy after a shutdown. + grpc_endpoint *endpoint_to_destroy; + grpc_slice_buffer *read_buffer_to_destroy; + + // State saved while performing the handshake. + grpc_handshaker_args *args; + grpc_closure *on_handshake_done; + + unsigned char *handshake_buffer; + size_t handshake_buffer_size; + grpc_slice_buffer left_overs; + grpc_slice_buffer outgoing; + grpc_closure on_handshake_data_sent_to_peer; + grpc_closure on_handshake_data_received_from_peer; + grpc_closure on_peer_checked; + grpc_auth_context *auth_context; +} security_handshaker; + +static void security_handshaker_unref(grpc_exec_ctx *exec_ctx, + security_handshaker *h) { + if (gpr_unref(&h->refs)) { + gpr_mu_destroy(&h->mu); + tsi_handshaker_destroy(h->handshaker); + if (h->endpoint_to_destroy != NULL) { + grpc_endpoint_destroy(exec_ctx, h->endpoint_to_destroy); + } + if (h->read_buffer_to_destroy != NULL) { + grpc_slice_buffer_destroy(h->read_buffer_to_destroy); + gpr_free(h->read_buffer_to_destroy); + } + gpr_free(h->handshake_buffer); + grpc_slice_buffer_destroy(&h->left_overs); + grpc_slice_buffer_destroy(&h->outgoing); + GRPC_AUTH_CONTEXT_UNREF(h->auth_context, "handshake"); + GRPC_SECURITY_CONNECTOR_UNREF(h->connector, "handshake"); + gpr_free(h); + } +} + +// Set args fields to NULL, saving the endpoint and read buffer for +// later destruction. +static void cleanup_args_for_failure_locked(security_handshaker *h) { + h->endpoint_to_destroy = h->args->endpoint; + h->args->endpoint = NULL; + h->read_buffer_to_destroy = h->args->read_buffer; + h->args->read_buffer = NULL; + grpc_channel_args_destroy(h->args->args); + h->args->args = NULL; +} + +// If the handshake failed or we're shutting down, clean up and invoke the +// callback with the error. +static void security_handshake_failed_locked(grpc_exec_ctx *exec_ctx, + security_handshaker *h, + grpc_error *error) { + if (error == GRPC_ERROR_NONE) { + // If we were shut down after the handshake succeeded but before an + // endpoint callback was invoked, we need to generate our own error. + error = GRPC_ERROR_CREATE("Handshaker shutdown"); + } + const char *msg = grpc_error_string(error); + gpr_log(GPR_DEBUG, "Security handshake failed: %s", msg); + grpc_error_free_string(msg); + if (!h->shutdown) { + // TODO(ctiller): It is currently necessary to shutdown endpoints + // before destroying them, even if we know that there are no + // pending read/write callbacks. This should be fixed, at which + // point this can be removed. + grpc_endpoint_shutdown(exec_ctx, h->args->endpoint); + // Not shutting down, so the write failed. Clean up before + // invoking the callback. + cleanup_args_for_failure_locked(h); + } + // Invoke callback. + grpc_exec_ctx_sched(exec_ctx, h->on_handshake_done, error, NULL); +} + +static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + security_handshaker *h = arg; + gpr_mu_lock(&h->mu); + if (error != GRPC_ERROR_NONE || h->shutdown) { + security_handshake_failed_locked(exec_ctx, h, GRPC_ERROR_REF(error)); + goto done; + } + // Get frame protector. + tsi_frame_protector *protector; + tsi_result result = + tsi_handshaker_create_frame_protector(h->handshaker, NULL, &protector); + if (result != TSI_OK) { + error = grpc_set_tsi_error_result( + GRPC_ERROR_CREATE("Frame protector creation failed"), result); + security_handshake_failed_locked(exec_ctx, h, error); + goto done; + } + // Success. + // Create secure endpoint. + h->args->endpoint = grpc_secure_endpoint_create( + protector, h->args->endpoint, h->left_overs.slices, h->left_overs.count); + h->left_overs.count = 0; + h->left_overs.length = 0; + // Clear out the read buffer before it gets passed to the transport, + // since any excess bytes were already copied to h->left_overs. + grpc_slice_buffer_reset_and_unref(h->args->read_buffer); + // Add auth context to channel args. + grpc_arg auth_context_arg = grpc_auth_context_to_arg(h->auth_context); + grpc_channel_args *tmp_args = h->args->args; + h->args->args = + grpc_channel_args_copy_and_add(tmp_args, &auth_context_arg, 1); + grpc_channel_args_destroy(tmp_args); + // Invoke callback. + grpc_exec_ctx_sched(exec_ctx, h->on_handshake_done, GRPC_ERROR_NONE, NULL); + // Set shutdown to true so that subsequent calls to + // security_handshaker_shutdown() do nothing. + h->shutdown = true; +done: + gpr_mu_unlock(&h->mu); + security_handshaker_unref(exec_ctx, h); +} + +static grpc_error *check_peer_locked(grpc_exec_ctx *exec_ctx, + security_handshaker *h) { + tsi_peer peer; + tsi_result result = tsi_handshaker_extract_peer(h->handshaker, &peer); + if (result != TSI_OK) { + return grpc_set_tsi_error_result( + GRPC_ERROR_CREATE("Peer extraction failed"), result); + } + grpc_security_connector_check_peer(exec_ctx, h->connector, peer, + &h->auth_context, &h->on_peer_checked); + return GRPC_ERROR_NONE; +} + +static grpc_error *send_handshake_bytes_to_peer_locked(grpc_exec_ctx *exec_ctx, + security_handshaker *h) { + // Get data to send. + tsi_result result = TSI_OK; + size_t offset = 0; + do { + size_t to_send_size = h->handshake_buffer_size - offset; + result = tsi_handshaker_get_bytes_to_send_to_peer( + h->handshaker, h->handshake_buffer + offset, &to_send_size); + offset += to_send_size; + if (result == TSI_INCOMPLETE_DATA) { + h->handshake_buffer_size *= 2; + h->handshake_buffer = + gpr_realloc(h->handshake_buffer, h->handshake_buffer_size); + } + } while (result == TSI_INCOMPLETE_DATA); + if (result != TSI_OK) { + return grpc_set_tsi_error_result(GRPC_ERROR_CREATE("Handshake failed"), + result); + } + // Send data. + grpc_slice to_send = + grpc_slice_from_copied_buffer((const char *)h->handshake_buffer, offset); + grpc_slice_buffer_reset_and_unref(&h->outgoing); + grpc_slice_buffer_add(&h->outgoing, to_send); + grpc_endpoint_write(exec_ctx, h->args->endpoint, &h->outgoing, + &h->on_handshake_data_sent_to_peer); + return GRPC_ERROR_NONE; +} + +static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error) { + security_handshaker *h = arg; + gpr_mu_lock(&h->mu); + if (error != GRPC_ERROR_NONE || h->shutdown) { + security_handshake_failed_locked( + exec_ctx, h, + GRPC_ERROR_CREATE_REFERENCING("Handshake read failed", &error, 1)); + gpr_mu_unlock(&h->mu); + security_handshaker_unref(exec_ctx, h); + return; + } + // Process received data. + tsi_result result = TSI_OK; + size_t consumed_slice_size = 0; + size_t i; + for (i = 0; i < h->args->read_buffer->count; i++) { + consumed_slice_size = GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i]); + result = tsi_handshaker_process_bytes_from_peer( + h->handshaker, GRPC_SLICE_START_PTR(h->args->read_buffer->slices[i]), + &consumed_slice_size); + if (!tsi_handshaker_is_in_progress(h->handshaker)) break; + } + if (tsi_handshaker_is_in_progress(h->handshaker)) { + /* We may need more data. */ + if (result == TSI_INCOMPLETE_DATA) { + grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer, + &h->on_handshake_data_received_from_peer); + goto done; + } else { + error = send_handshake_bytes_to_peer_locked(exec_ctx, h); + if (error != GRPC_ERROR_NONE) { + security_handshake_failed_locked(exec_ctx, h, error); + gpr_mu_unlock(&h->mu); + security_handshaker_unref(exec_ctx, h); + return; + } + goto done; + } + } + if (result != TSI_OK) { + security_handshake_failed_locked( + exec_ctx, h, grpc_set_tsi_error_result( + GRPC_ERROR_CREATE("Handshake failed"), result)); + gpr_mu_unlock(&h->mu); + security_handshaker_unref(exec_ctx, h); + return; + } + /* Handshake is done and successful this point. */ + bool has_left_overs_in_current_slice = + (consumed_slice_size < + GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i])); + size_t num_left_overs = (has_left_overs_in_current_slice ? 1 : 0) + + h->args->read_buffer->count - i - 1; + if (num_left_overs > 0) { + /* Put the leftovers in our buffer (ownership transfered). */ + if (has_left_overs_in_current_slice) { + grpc_slice_buffer_add( + &h->left_overs, + grpc_slice_split_tail(&h->args->read_buffer->slices[i], + consumed_slice_size)); + /* split_tail above increments refcount. */ + grpc_slice_unref(h->args->read_buffer->slices[i]); + } + grpc_slice_buffer_addn( + &h->left_overs, &h->args->read_buffer->slices[i + 1], + num_left_overs - (size_t)has_left_overs_in_current_slice); + } + // Check peer. + error = check_peer_locked(exec_ctx, h); + if (error != GRPC_ERROR_NONE) { + security_handshake_failed_locked(exec_ctx, h, error); + gpr_mu_unlock(&h->mu); + security_handshaker_unref(exec_ctx, h); + return; + } +done: + gpr_mu_unlock(&h->mu); +} + +static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + security_handshaker *h = arg; + gpr_mu_lock(&h->mu); + if (error != GRPC_ERROR_NONE || h->shutdown) { + security_handshake_failed_locked( + exec_ctx, h, + GRPC_ERROR_CREATE_REFERENCING("Handshake write failed", &error, 1)); + gpr_mu_unlock(&h->mu); + security_handshaker_unref(exec_ctx, h); + return; + } + /* We may be done. */ + if (tsi_handshaker_is_in_progress(h->handshaker)) { + grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer, + &h->on_handshake_data_received_from_peer); + } else { + error = check_peer_locked(exec_ctx, h); + if (error != GRPC_ERROR_NONE) { + security_handshake_failed_locked(exec_ctx, h, error); + gpr_mu_unlock(&h->mu); + security_handshaker_unref(exec_ctx, h); + return; + } + } + gpr_mu_unlock(&h->mu); +} + +// +// public handshaker API +// + +static void security_handshaker_destroy(grpc_exec_ctx *exec_ctx, + grpc_handshaker *handshaker) { + security_handshaker *h = (security_handshaker *)handshaker; + security_handshaker_unref(exec_ctx, h); +} + +static void security_handshaker_shutdown(grpc_exec_ctx *exec_ctx, + grpc_handshaker *handshaker) { + security_handshaker *h = (security_handshaker *)handshaker; + gpr_mu_lock(&h->mu); + if (!h->shutdown) { + h->shutdown = true; + grpc_endpoint_shutdown(exec_ctx, h->args->endpoint); + cleanup_args_for_failure_locked(h); + } + gpr_mu_unlock(&h->mu); +} + +static void security_handshaker_do_handshake(grpc_exec_ctx *exec_ctx, + grpc_handshaker *handshaker, + grpc_tcp_server_acceptor *acceptor, + grpc_closure *on_handshake_done, + grpc_handshaker_args *args) { + security_handshaker *h = (security_handshaker *)handshaker; + gpr_mu_lock(&h->mu); + h->args = args; + h->on_handshake_done = on_handshake_done; + gpr_ref(&h->refs); + grpc_error *error = send_handshake_bytes_to_peer_locked(exec_ctx, h); + if (error != GRPC_ERROR_NONE) { + security_handshake_failed_locked(exec_ctx, h, error); + gpr_mu_unlock(&h->mu); + security_handshaker_unref(exec_ctx, h); + return; + } + gpr_mu_unlock(&h->mu); +} + +static const grpc_handshaker_vtable security_handshaker_vtable = { + security_handshaker_destroy, security_handshaker_shutdown, + security_handshaker_do_handshake}; + +static grpc_handshaker *security_handshaker_create( + grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker, + grpc_security_connector *connector) { + security_handshaker *h = gpr_malloc(sizeof(security_handshaker)); + memset(h, 0, sizeof(security_handshaker)); + grpc_handshaker_init(&security_handshaker_vtable, &h->base); + h->handshaker = handshaker; + h->connector = GRPC_SECURITY_CONNECTOR_REF(connector, "handshake"); + gpr_mu_init(&h->mu); + gpr_ref_init(&h->refs, 1); + h->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE; + h->handshake_buffer = gpr_malloc(h->handshake_buffer_size); + grpc_closure_init(&h->on_handshake_data_sent_to_peer, + on_handshake_data_sent_to_peer, h); + grpc_closure_init(&h->on_handshake_data_received_from_peer, + on_handshake_data_received_from_peer, h); + grpc_closure_init(&h->on_peer_checked, on_peer_checked, h); + grpc_slice_buffer_init(&h->left_overs); + grpc_slice_buffer_init(&h->outgoing); + return &h->base; +} + +// +// fail_handshaker +// + +static void fail_handshaker_destroy(grpc_exec_ctx *exec_ctx, + grpc_handshaker *handshaker) { + gpr_free(handshaker); +} + +static void fail_handshaker_shutdown(grpc_exec_ctx *exec_ctx, + grpc_handshaker *handshaker) {} + +static void fail_handshaker_do_handshake(grpc_exec_ctx *exec_ctx, + grpc_handshaker *handshaker, + grpc_tcp_server_acceptor *acceptor, + grpc_closure *on_handshake_done, + grpc_handshaker_args *args) { + grpc_exec_ctx_sched(exec_ctx, on_handshake_done, + GRPC_ERROR_CREATE("Failed to create security handshaker"), + NULL); +} + +static const grpc_handshaker_vtable fail_handshaker_vtable = { + fail_handshaker_destroy, fail_handshaker_shutdown, + fail_handshaker_do_handshake}; + +static grpc_handshaker *fail_handshaker_create() { + grpc_handshaker *h = gpr_malloc(sizeof(*h)); + grpc_handshaker_init(&fail_handshaker_vtable, h); + return h; +} + +// +// exported functions +// + +void grpc_security_create_handshakers(grpc_exec_ctx *exec_ctx, + tsi_handshaker *handshaker, + grpc_security_connector *connector, + grpc_handshake_manager *handshake_mgr) { + // If no TSI handshaker was created, add a handshaker that always fails. + // Otherwise, add a real security handshaker. + if (handshaker == NULL) { + grpc_handshake_manager_add(handshake_mgr, fail_handshaker_create()); + } else { + grpc_handshake_manager_add( + handshake_mgr, + security_handshaker_create(exec_ctx, handshaker, connector)); + } +} diff --git a/src/core/lib/security/transport/handshake.h b/src/core/lib/security/transport/security_handshaker.h index f894540515..f71f43a359 100644 --- a/src/core/lib/security/transport/handshake.h +++ b/src/core/lib/security/transport/security_handshaker.h @@ -31,20 +31,17 @@ * */ -#ifndef GRPC_CORE_LIB_SECURITY_TRANSPORT_HANDSHAKE_H -#define GRPC_CORE_LIB_SECURITY_TRANSPORT_HANDSHAKE_H +#ifndef GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H +#define GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/security/transport/security_connector.h" -/* Calls the callback upon completion. Takes owership of handshaker and - * read_buffer. */ -void grpc_do_security_handshake( - grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker, - grpc_security_connector *connector, bool is_client_side, - grpc_endpoint *nonsecure_endpoint, grpc_slice_buffer *read_buffer, - gpr_timespec deadline, grpc_security_handshake_done_cb cb, void *user_data); +/// Creates any necessary security handshakers and adds them to +/// \a handshake_mgr. +void grpc_security_create_handshakers(grpc_exec_ctx *exec_ctx, + tsi_handshaker *handshaker, + grpc_security_connector *connector, + grpc_handshake_manager *handshake_mgr); -void grpc_security_handshake_shutdown(grpc_exec_ctx *exec_ctx, void *handshake); - -#endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_HANDSHAKE_H */ +#endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H */ diff --git a/src/core/lib/support/backoff.c b/src/core/lib/support/backoff.c index e89ef47220..0612472712 100644 --- a/src/core/lib/support/backoff.c +++ b/src/core/lib/support/backoff.c @@ -35,8 +35,10 @@ #include <grpc/support/useful.h> -void gpr_backoff_init(gpr_backoff *backoff, double multiplier, double jitter, +void gpr_backoff_init(gpr_backoff *backoff, int64_t initial_connect_timeout, + double multiplier, double jitter, int64_t min_timeout_millis, int64_t max_timeout_millis) { + backoff->initial_connect_timeout = initial_connect_timeout; backoff->multiplier = multiplier; backoff->jitter = jitter; backoff->min_timeout_millis = min_timeout_millis; @@ -45,9 +47,10 @@ void gpr_backoff_init(gpr_backoff *backoff, double multiplier, double jitter, } gpr_timespec gpr_backoff_begin(gpr_backoff *backoff, gpr_timespec now) { - backoff->current_timeout_millis = backoff->min_timeout_millis; - return gpr_time_add( - now, gpr_time_from_millis(backoff->current_timeout_millis, GPR_TIMESPAN)); + backoff->current_timeout_millis = backoff->initial_connect_timeout; + const int64_t first_timeout = + GPR_MAX(backoff->current_timeout_millis, backoff->min_timeout_millis); + return gpr_time_add(now, gpr_time_from_millis(first_timeout, GPR_TIMESPAN)); } /* Generate a random number between 0 and 1. */ @@ -57,20 +60,28 @@ static double generate_uniform_random_number(uint32_t *rng_state) { } gpr_timespec gpr_backoff_step(gpr_backoff *backoff, gpr_timespec now) { - double new_timeout_millis = + const double new_timeout_millis = backoff->multiplier * (double)backoff->current_timeout_millis; - double jitter_range = backoff->jitter * new_timeout_millis; - double jitter = + backoff->current_timeout_millis = + GPR_MIN((int64_t)new_timeout_millis, backoff->max_timeout_millis); + + const double jitter_range_width = backoff->jitter * new_timeout_millis; + const double jitter = (2 * generate_uniform_random_number(&backoff->rng_state) - 1) * - jitter_range; + jitter_range_width; + backoff->current_timeout_millis = - GPR_CLAMP((int64_t)(new_timeout_millis + jitter), - backoff->min_timeout_millis, backoff->max_timeout_millis); - return gpr_time_add( + (int64_t)((double)(backoff->current_timeout_millis) + jitter); + + const gpr_timespec current_deadline = gpr_time_add( now, gpr_time_from_millis(backoff->current_timeout_millis, GPR_TIMESPAN)); + + const gpr_timespec min_deadline = gpr_time_add( + now, gpr_time_from_millis(backoff->min_timeout_millis, GPR_TIMESPAN)); + + return gpr_time_max(current_deadline, min_deadline); } void gpr_backoff_reset(gpr_backoff *backoff) { - // forces step() to return a timeout of min_timeout_millis - backoff->current_timeout_millis = 0; + backoff->current_timeout_millis = backoff->initial_connect_timeout; } diff --git a/src/core/lib/support/backoff.h b/src/core/lib/support/backoff.h index 6d40c15546..5e9b740824 100644 --- a/src/core/lib/support/backoff.h +++ b/src/core/lib/support/backoff.h @@ -37,7 +37,9 @@ #include <grpc/support/time.h> typedef struct { - /// const: multiplier between retry attempts + /// const: how long to wait after the first failure before retrying + int64_t initial_connect_timeout; + /// const: factor with which to multiply backoff after a failed retry double multiplier; /// const: amount to randomize backoffs double jitter; @@ -54,7 +56,8 @@ typedef struct { } gpr_backoff; /// Initialize backoff machinery - does not need to be destroyed -void gpr_backoff_init(gpr_backoff *backoff, double multiplier, double jitter, +void gpr_backoff_init(gpr_backoff *backoff, int64_t initial_connect_timeout, + double multiplier, double jitter, int64_t min_timeout_millis, int64_t max_timeout_millis); /// Begin retry loop: returns a timespec for the NEXT retry |