diff options
Diffstat (limited to 'src/core/ext')
15 files changed, 1040 insertions, 872 deletions
diff --git a/src/core/ext/client_channel/http_connect_handshaker.c b/src/core/ext/client_channel/http_connect_handshaker.c index 82042897b2..572af52dfd 100644 --- a/src/core/ext/client_channel/http_connect_handshaker.c +++ b/src/core/ext/client_channel/http_connect_handshaker.c @@ -41,9 +41,9 @@ #include <grpc/support/string_util.h> #include "src/core/ext/client_channel/uri_parser.h" +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/http/format_request.h" #include "src/core/lib/http/parser.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/support/env.h" typedef struct http_connect_handshaker { @@ -53,27 +53,38 @@ typedef struct http_connect_handshaker { char* proxy_server; char* server_name; + gpr_refcount refcount; + gpr_mu mu; + + bool shutdown; + // Endpoint and read buffer to destroy after a shutdown. + grpc_endpoint* endpoint_to_destroy; + grpc_slice_buffer* read_buffer_to_destroy; + // State saved while performing the handshake. - grpc_endpoint* endpoint; - grpc_channel_args* args; - grpc_handshaker_done_cb cb; - void* user_data; + grpc_handshaker_args* args; + grpc_closure* on_handshake_done; // Objects for processing the HTTP CONNECT request and response. grpc_slice_buffer write_buffer; - grpc_slice_buffer* read_buffer; // Ownership passes through this object. grpc_closure request_done_closure; grpc_closure response_read_closure; grpc_http_parser http_parser; grpc_http_response http_response; - grpc_timer timeout_timer; - - gpr_refcount refcount; } http_connect_handshaker; // Unref and clean up handshaker. -static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) { +static void http_connect_handshaker_unref(grpc_exec_ctx* exec_ctx, + http_connect_handshaker* handshaker) { if (gpr_unref(&handshaker->refcount)) { + gpr_mu_destroy(&handshaker->mu); + if (handshaker->endpoint_to_destroy != NULL) { + grpc_endpoint_destroy(exec_ctx, handshaker->endpoint_to_destroy); + } + if (handshaker->read_buffer_to_destroy != NULL) { + grpc_slice_buffer_destroy(handshaker->read_buffer_to_destroy); + gpr_free(handshaker->read_buffer_to_destroy); + } gpr_free(handshaker->proxy_server); gpr_free(handshaker->server_name); grpc_slice_buffer_destroy(&handshaker->write_buffer); @@ -83,28 +94,64 @@ static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) { } } -// Callback invoked when deadline is exceeded. -static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - http_connect_handshaker* handshaker = arg; - if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled. - grpc_endpoint_shutdown(exec_ctx, handshaker->endpoint); +// Set args fields to NULL, saving the endpoint and read buffer for +// later destruction. +static void cleanup_args_for_failure_locked( + http_connect_handshaker* handshaker) { + handshaker->endpoint_to_destroy = handshaker->args->endpoint; + handshaker->args->endpoint = NULL; + handshaker->read_buffer_to_destroy = handshaker->args->read_buffer; + handshaker->args->read_buffer = NULL; + grpc_channel_args_destroy(handshaker->args->args); + handshaker->args->args = NULL; +} + +// If the handshake failed or we're shutting down, clean up and invoke the +// callback with the error. +static void handshake_failed_locked(grpc_exec_ctx* exec_ctx, + http_connect_handshaker* handshaker, + grpc_error* error) { + if (error == GRPC_ERROR_NONE) { + // If we were shut down after an endpoint operation succeeded but + // before the endpoint callback was invoked, we need to generate our + // own error. + error = GRPC_ERROR_CREATE("Handshaker shutdown"); } - http_connect_handshaker_unref(handshaker); + if (!handshaker->shutdown) { + // TODO(ctiller): It is currently necessary to shutdown endpoints + // before destroying them, even if we know that there are no + // pending read/write callbacks. This should be fixed, at which + // point this can be removed. + grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint); + // Not shutting down, so the handshake failed. Clean up before + // invoking the callback. + cleanup_args_for_failure_locked(handshaker); + // Set shutdown to true so that subsequent calls to + // http_connect_handshaker_shutdown() do nothing. + handshaker->shutdown = true; + } + // Invoke callback. + grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL); } // Callback invoked when finished writing HTTP CONNECT request. static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { http_connect_handshaker* handshaker = arg; - if (error != GRPC_ERROR_NONE) { - // If the write failed, invoke the callback immediately with the error. - handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args, - handshaker->read_buffer, handshaker->user_data, - GRPC_ERROR_REF(error)); + gpr_mu_lock(&handshaker->mu); + if (error != GRPC_ERROR_NONE || handshaker->shutdown) { + // If the write failed or we're shutting down, clean up and invoke the + // callback with the error. + handshake_failed_locked(exec_ctx, handshaker, GRPC_ERROR_REF(error)); + gpr_mu_unlock(&handshaker->mu); + http_connect_handshaker_unref(exec_ctx, handshaker); } else { // Otherwise, read the response. - grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer, + // The read callback inherits our ref to the handshaker. + grpc_endpoint_read(exec_ctx, handshaker->args->endpoint, + handshaker->args->read_buffer, &handshaker->response_read_closure); + gpr_mu_unlock(&handshaker->mu); } } @@ -112,36 +159,40 @@ static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg, static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { http_connect_handshaker* handshaker = arg; - if (error != GRPC_ERROR_NONE) { - GRPC_ERROR_REF(error); // Take ref to pass to the handshake-done callback. + gpr_mu_lock(&handshaker->mu); + if (error != GRPC_ERROR_NONE || handshaker->shutdown) { + // If the read failed or we're shutting down, clean up and invoke the + // callback with the error. + handshake_failed_locked(exec_ctx, handshaker, GRPC_ERROR_REF(error)); goto done; } // Add buffer to parser. - for (size_t i = 0; i < handshaker->read_buffer->count; ++i) { - if (GRPC_SLICE_LENGTH(handshaker->read_buffer->slices[i]) > 0) { + for (size_t i = 0; i < handshaker->args->read_buffer->count; ++i) { + if (GRPC_SLICE_LENGTH(handshaker->args->read_buffer->slices[i]) > 0) { size_t body_start_offset = 0; error = grpc_http_parser_parse(&handshaker->http_parser, - handshaker->read_buffer->slices[i], + handshaker->args->read_buffer->slices[i], &body_start_offset); - if (error != GRPC_ERROR_NONE) goto done; + if (error != GRPC_ERROR_NONE) { + handshake_failed_locked(exec_ctx, handshaker, error); + goto done; + } if (handshaker->http_parser.state == GRPC_HTTP_BODY) { - // We've gotten back a successul response, so stop the timeout timer. - grpc_timer_cancel(exec_ctx, &handshaker->timeout_timer); // Remove the data we've already read from the read buffer, // leaving only the leftover bytes (if any). grpc_slice_buffer tmp_buffer; grpc_slice_buffer_init(&tmp_buffer); if (body_start_offset < - GRPC_SLICE_LENGTH(handshaker->read_buffer->slices[i])) { + GRPC_SLICE_LENGTH(handshaker->args->read_buffer->slices[i])) { grpc_slice_buffer_add( &tmp_buffer, - grpc_slice_split_tail(&handshaker->read_buffer->slices[i], + grpc_slice_split_tail(&handshaker->args->read_buffer->slices[i], body_start_offset)); } grpc_slice_buffer_addn(&tmp_buffer, - &handshaker->read_buffer->slices[i + 1], - handshaker->read_buffer->count - i - 1); - grpc_slice_buffer_swap(handshaker->read_buffer, &tmp_buffer); + &handshaker->args->read_buffer->slices[i + 1], + handshaker->args->read_buffer->count - i - 1); + grpc_slice_buffer_swap(handshaker->args->read_buffer, &tmp_buffer); grpc_slice_buffer_destroy(&tmp_buffer); break; } @@ -159,9 +210,11 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, // complete (e.g., handling chunked transfer encoding or looking // at the Content-Length: header). if (handshaker->http_parser.state != GRPC_HTTP_BODY) { - grpc_slice_buffer_reset_and_unref(handshaker->read_buffer); - grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer, + grpc_slice_buffer_reset_and_unref(handshaker->args->read_buffer); + grpc_endpoint_read(exec_ctx, handshaker->args->endpoint, + handshaker->args->read_buffer, &handshaker->response_read_closure); + gpr_mu_unlock(&handshaker->mu); return; } // Make sure we got a 2xx response. @@ -172,11 +225,17 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, handshaker->http_response.status); error = GRPC_ERROR_CREATE(msg); gpr_free(msg); + handshake_failed_locked(exec_ctx, handshaker, error); + goto done; } + // Success. Invoke handshake-done callback. + grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL); done: - // Invoke handshake-done callback. - handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args, - handshaker->read_buffer, handshaker->user_data, error); + // Set shutdown to true so that subsequent calls to + // http_connect_handshaker_shutdown() do nothing. + handshaker->shutdown = true; + gpr_mu_unlock(&handshaker->mu); + http_connect_handshaker_unref(exec_ctx, handshaker); } // @@ -186,25 +245,30 @@ done: static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker_in) { http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in; - http_connect_handshaker_unref(handshaker); + http_connect_handshaker_unref(exec_ctx, handshaker); } static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx, - grpc_handshaker* handshaker) {} + grpc_handshaker* handshaker_in) { + http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in; + gpr_mu_lock(&handshaker->mu); + if (!handshaker->shutdown) { + handshaker->shutdown = true; + grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint); + cleanup_args_for_failure_locked(handshaker); + } + gpr_mu_unlock(&handshaker->mu); +} static void http_connect_handshaker_do_handshake( grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker_in, - grpc_endpoint* endpoint, grpc_channel_args* args, - grpc_slice_buffer* read_buffer, gpr_timespec deadline, - grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb, - void* user_data) { + grpc_tcp_server_acceptor* acceptor, grpc_closure* on_handshake_done, + grpc_handshaker_args* args) { http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in; + gpr_mu_lock(&handshaker->mu); // Save state in the handshaker object. - handshaker->endpoint = endpoint; handshaker->args = args; - handshaker->cb = cb; - handshaker->user_data = user_data; - handshaker->read_buffer = read_buffer; + handshaker->on_handshake_done = on_handshake_done; // Send HTTP CONNECT request. gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s", handshaker->server_name, handshaker->proxy_server); @@ -216,16 +280,14 @@ static void http_connect_handshaker_do_handshake( request.handshaker = &grpc_httpcli_plaintext; grpc_slice request_slice = grpc_httpcli_format_connect_request(&request); grpc_slice_buffer_add(&handshaker->write_buffer, request_slice); - grpc_endpoint_write(exec_ctx, endpoint, &handshaker->write_buffer, - &handshaker->request_done_closure); - // Set timeout timer. The timer gets a reference to the handshaker. + // Take a new ref to be held by the write callback. gpr_ref(&handshaker->refcount); - grpc_timer_init(exec_ctx, &handshaker->timeout_timer, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - on_timeout, handshaker, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_endpoint_write(exec_ctx, args->endpoint, &handshaker->write_buffer, + &handshaker->request_done_closure); + gpr_mu_unlock(&handshaker->mu); } -static const struct grpc_handshaker_vtable http_connect_handshaker_vtable = { +static const grpc_handshaker_vtable http_connect_handshaker_vtable = { http_connect_handshaker_destroy, http_connect_handshaker_shutdown, http_connect_handshaker_do_handshake}; @@ -233,10 +295,11 @@ grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server, const char* server_name) { GPR_ASSERT(proxy_server != NULL); GPR_ASSERT(server_name != NULL); - http_connect_handshaker* handshaker = - gpr_malloc(sizeof(http_connect_handshaker)); + http_connect_handshaker* handshaker = gpr_malloc(sizeof(*handshaker)); memset(handshaker, 0, sizeof(*handshaker)); grpc_handshaker_init(&http_connect_handshaker_vtable, &handshaker->base); + gpr_mu_init(&handshaker->mu); + gpr_ref_init(&handshaker->refcount, 1); handshaker->proxy_server = gpr_strdup(proxy_server); handshaker->server_name = gpr_strdup(server_name); grpc_slice_buffer_init(&handshaker->write_buffer); @@ -246,7 +309,6 @@ grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server, handshaker); grpc_http_parser_init(&handshaker->http_parser, GRPC_HTTP_RESPONSE, &handshaker->http_response); - gpr_ref_init(&handshaker->refcount, 1); return &handshaker->base; } diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index 2ab07db30c..f294e69392 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -336,16 +336,18 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, int initial_backoff_ms = GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; + int min_backoff_ms = GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS * 1000; bool fixed_reconnect_backoff = false; if (c->args) { for (size_t i = 0; i < c->args->num_args; i++) { if (0 == strcmp(c->args->args[i].key, - "grpc.testing.fixed_reconnect_backoff")) { + "grpc.testing.fixed_reconnect_backoff_ms")) { GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER); fixed_reconnect_backoff = true; - initial_backoff_ms = max_backoff_ms = grpc_channel_arg_get_integer( - &c->args->args[i], - (grpc_integer_options){initial_backoff_ms, 100, INT_MAX}); + initial_backoff_ms = min_backoff_ms = max_backoff_ms = + grpc_channel_arg_get_integer( + &c->args->args[i], + (grpc_integer_options){initial_backoff_ms, 100, INT_MAX}); } else if (0 == strcmp(c->args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { fixed_reconnect_backoff = false; @@ -362,11 +364,11 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, } } gpr_backoff_init( - &c->backoff_state, + &c->backoff_state, initial_backoff_ms, fixed_reconnect_backoff ? 1.0 : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER, - initial_backoff_ms, max_backoff_ms); + min_backoff_ms, max_backoff_ms); gpr_mu_init(&c->mu); return grpc_subchannel_index_register(exec_ctx, key, c); @@ -643,9 +645,7 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx, GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con)); /* setup subchannel watching connected subchannel for changes; subchannel - ref - for connecting is donated - to the state watcher */ + ref for connecting is donated to the state watcher */ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); grpc_connected_subchannel_notify_on_state_change( diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 4262d2b9a4..df0db61c22 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -123,10 +123,11 @@ #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/static_metadata.h" -#define BACKOFF_MULTIPLIER 1.6 -#define BACKOFF_JITTER 0.2 -#define BACKOFF_MIN_SECONDS 10 -#define BACKOFF_MAX_SECONDS 60 +#define GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS 20 +#define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1 +#define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6 +#define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120 +#define GRPC_GRPCLB_RECONNECT_JITTER 0.2 int grpc_lb_glb_trace = 0; @@ -1107,9 +1108,12 @@ static void lb_call_init_locked(glb_lb_policy *glb_policy) { grpc_closure_init(&glb_policy->lb_on_response_received, lb_on_response_received, glb_policy); - gpr_backoff_init(&glb_policy->lb_call_backoff_state, BACKOFF_MULTIPLIER, - BACKOFF_JITTER, BACKOFF_MIN_SECONDS * 1000, - BACKOFF_MAX_SECONDS * 1000); + gpr_backoff_init(&glb_policy->lb_call_backoff_state, + GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS, + GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER, + GRPC_GRPCLB_RECONNECT_JITTER, + GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000, + GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); } static void lb_call_destroy_locked(glb_lb_policy *glb_policy) { diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index 665439f360..15476f5792 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -46,10 +46,11 @@ #include "src/core/lib/support/backoff.h" #include "src/core/lib/support/string.h" -#define BACKOFF_MULTIPLIER 1.6 -#define BACKOFF_JITTER 0.2 -#define BACKOFF_MIN_SECONDS 1 -#define BACKOFF_MAX_SECONDS 120 +#define GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS 1 +#define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1 +#define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6 +#define GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS 120 +#define GRPC_DNS_RECONNECT_JITTER 0.2 typedef struct { /** base class: must be first */ @@ -269,8 +270,11 @@ static grpc_resolver *dns_create(grpc_resolver_args *args, server_name_arg.value.string = (char *)path; r->channel_args = grpc_channel_args_copy_and_add(args->args, &server_name_arg, 1); - gpr_backoff_init(&r->backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER, - BACKOFF_MIN_SECONDS * 1000, BACKOFF_MAX_SECONDS * 1000); + gpr_backoff_init(&r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS, + GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, + GRPC_DNS_RECONNECT_JITTER, + GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000, + GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); return &r->base; } diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c new file mode 100644 index 0000000000..213395c20f --- /dev/null +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c @@ -0,0 +1,261 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/transport/chttp2/client/chttp2_connector.h" + +#include <grpc/grpc.h> + +#include <string.h> + +#include <grpc/slice_buffer.h> +#include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> + +#include "src/core/ext/client_channel/connector.h" +#include "src/core/ext/client_channel/http_connect_handshaker.h" +#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/iomgr/tcp_client.h" +#include "src/core/lib/security/transport/security_connector.h" + +typedef struct { + grpc_connector base; + + gpr_mu mu; + gpr_refcount refs; + + bool shutdown; + + char *server_name; + grpc_chttp2_create_handshakers_func create_handshakers; + void *create_handshakers_user_data; + + grpc_closure *notify; + grpc_connect_in_args args; + grpc_connect_out_args *result; + grpc_closure initial_string_sent; + grpc_slice_buffer initial_string_buffer; + + grpc_endpoint *endpoint; // Non-NULL until handshaking starts. + + grpc_closure connected; + + grpc_handshake_manager *handshake_mgr; +} chttp2_connector; + +static void chttp2_connector_ref(grpc_connector *con) { + chttp2_connector *c = (chttp2_connector *)con; + gpr_ref(&c->refs); +} + +static void chttp2_connector_unref(grpc_exec_ctx *exec_ctx, + grpc_connector *con) { + chttp2_connector *c = (chttp2_connector *)con; + if (gpr_unref(&c->refs)) { + /* c->initial_string_buffer does not need to be destroyed */ + gpr_mu_destroy(&c->mu); + // If handshaking is not yet in progress, destroy the endpoint. + // Otherwise, the handshaker will do this for us. + if (c->endpoint != NULL) grpc_endpoint_destroy(exec_ctx, c->endpoint); + gpr_free(c->server_name); + gpr_free(c); + } +} + +static void chttp2_connector_shutdown(grpc_exec_ctx *exec_ctx, + grpc_connector *con) { + chttp2_connector *c = (chttp2_connector *)con; + gpr_mu_lock(&c->mu); + c->shutdown = true; + if (c->handshake_mgr != NULL) { + grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr); + } + // If handshaking is not yet in progress, shutdown the endpoint. + // Otherwise, the handshaker will do this for us. + if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint); + gpr_mu_unlock(&c->mu); +} + +static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_handshaker_args *args = arg; + chttp2_connector *c = args->user_data; + gpr_mu_lock(&c->mu); + if (error != GRPC_ERROR_NONE || c->shutdown) { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE("connector shutdown"); + // We were shut down after handshaking completed successfully, so + // destroy the endpoint here. + // TODO(ctiller): It is currently necessary to shutdown endpoints + // before destroying them, even if we know that there are no + // pending read/write callbacks. This should be fixed, at which + // point this can be removed. + grpc_endpoint_shutdown(exec_ctx, args->endpoint); + grpc_endpoint_destroy(exec_ctx, args->endpoint); + grpc_channel_args_destroy(args->args); + grpc_slice_buffer_destroy(args->read_buffer); + gpr_free(args->read_buffer); + } else { + error = GRPC_ERROR_REF(error); + } + memset(c->result, 0, sizeof(*c->result)); + } else { + c->result->transport = + grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 1); + GPR_ASSERT(c->result->transport); + grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, + args->read_buffer); + c->result->channel_args = args->args; + } + grpc_closure *notify = c->notify; + c->notify = NULL; + grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); + grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); + c->handshake_mgr = NULL; + gpr_mu_unlock(&c->mu); + chttp2_connector_unref(exec_ctx, (grpc_connector *)c); +} + +static void start_handshake_locked(grpc_exec_ctx *exec_ctx, + chttp2_connector *c) { + c->handshake_mgr = grpc_handshake_manager_create(); + char *proxy_name = grpc_get_http_proxy_server(); + if (proxy_name != NULL) { + grpc_handshake_manager_add( + c->handshake_mgr, + grpc_http_connect_handshaker_create(proxy_name, c->server_name)); + gpr_free(proxy_name); + } + if (c->create_handshakers != NULL) { + c->create_handshakers(exec_ctx, c->create_handshakers_user_data, + c->handshake_mgr); + } + grpc_handshake_manager_do_handshake( + exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args, + c->args.deadline, NULL /* acceptor */, on_handshake_done, c); + c->endpoint = NULL; // Endpoint handed off to handshake manager. +} + +static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + chttp2_connector *c = arg; + gpr_mu_lock(&c->mu); + if (error != GRPC_ERROR_NONE || c->shutdown) { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE("connector shutdown"); + } else { + error = GRPC_ERROR_REF(error); + } + memset(c->result, 0, sizeof(*c->result)); + grpc_closure *notify = c->notify; + c->notify = NULL; + grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); + gpr_mu_unlock(&c->mu); + chttp2_connector_unref(exec_ctx, arg); + } else { + start_handshake_locked(exec_ctx, c); + gpr_mu_unlock(&c->mu); + } +} + +static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + chttp2_connector *c = arg; + gpr_mu_lock(&c->mu); + if (error != GRPC_ERROR_NONE || c->shutdown) { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE("connector shutdown"); + } else { + error = GRPC_ERROR_REF(error); + } + memset(c->result, 0, sizeof(*c->result)); + grpc_closure *notify = c->notify; + c->notify = NULL; + grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); + gpr_mu_unlock(&c->mu); + chttp2_connector_unref(exec_ctx, arg); + } else { + GPR_ASSERT(c->endpoint != NULL); + if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) { + grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent, + c); + grpc_slice_buffer_init(&c->initial_string_buffer); + grpc_slice_buffer_add(&c->initial_string_buffer, + c->args.initial_connect_string); + grpc_endpoint_write(exec_ctx, c->endpoint, &c->initial_string_buffer, + &c->initial_string_sent); + } else { + start_handshake_locked(exec_ctx, c); + } + gpr_mu_unlock(&c->mu); + } +} + +static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx, + grpc_connector *con, + const grpc_connect_in_args *args, + grpc_connect_out_args *result, + grpc_closure *notify) { + chttp2_connector *c = (chttp2_connector *)con; + gpr_mu_lock(&c->mu); + GPR_ASSERT(c->notify == NULL); + c->notify = notify; + c->args = *args; + c->result = result; + GPR_ASSERT(c->endpoint == NULL); + chttp2_connector_ref(con); // Ref taken for callback. + grpc_closure_init(&c->connected, connected, c); + grpc_tcp_client_connect(exec_ctx, &c->connected, &c->endpoint, + args->interested_parties, args->channel_args, + args->addr, args->deadline); + gpr_mu_unlock(&c->mu); +} + +static const grpc_connector_vtable chttp2_connector_vtable = { + chttp2_connector_ref, chttp2_connector_unref, chttp2_connector_shutdown, + chttp2_connector_connect}; + +grpc_connector *grpc_chttp2_connector_create( + grpc_exec_ctx *exec_ctx, const char *server_name, + grpc_chttp2_create_handshakers_func create_handshakers, + void *create_handshakers_user_data) { + chttp2_connector *c = gpr_malloc(sizeof(*c)); + memset(c, 0, sizeof(*c)); + c->base.vtable = &chttp2_connector_vtable; + gpr_mu_init(&c->mu); + gpr_ref_init(&c->refs, 1); + c->server_name = gpr_strdup(server_name); + c->create_handshakers = create_handshakers; + c->create_handshakers_user_data = create_handshakers_user_data; + return &c->base; +} diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.h b/src/core/ext/transport/chttp2/client/chttp2_connector.h new file mode 100644 index 0000000000..6c34ce1af1 --- /dev/null +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.h @@ -0,0 +1,52 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H + +#include "src/core/ext/client_channel/connector.h" +#include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/iomgr/exec_ctx.h" + +typedef void (*grpc_chttp2_create_handshakers_func)( + grpc_exec_ctx* exec_ctx, void* user_data, + grpc_handshake_manager* handshake_mgr); + +/// If \a create_handshakers is non-NULL, it will be called with +/// \a create_handshakers_user_data to add handshakers. +grpc_connector* grpc_chttp2_connector_create( + grpc_exec_ctx* exec_ctx, const char* server_name, + grpc_chttp2_create_handshakers_func create_handshakers, + void* create_handshakers_user_data); + +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H */ diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 9e0478feab..7325f9413d 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -33,137 +33,17 @@ #include <grpc/grpc.h> -#include <stdlib.h> #include <string.h> -#include <grpc/slice.h> -#include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> #include "src/core/ext/client_channel/client_channel.h" -#include "src/core/ext/client_channel/http_connect_handshaker.h" -#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/ext/transport/chttp2/client/chttp2_connector.h" #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/channel/compress_filter.h" -#include "src/core/lib/channel/handshaker.h" -#include "src/core/lib/channel/http_client_filter.h" -#include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/channel.h" -// -// connector -// - -typedef struct { - grpc_connector base; - gpr_refcount refs; - - grpc_closure *notify; - grpc_connect_in_args args; - grpc_connect_out_args *result; - grpc_closure initial_string_sent; - grpc_slice_buffer initial_string_buffer; - - grpc_endpoint *tcp; - - grpc_closure connected; - - grpc_handshake_manager *handshake_mgr; -} connector; - -static void connector_ref(grpc_connector *con) { - connector *c = (connector *)con; - gpr_ref(&c->refs); -} - -static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { - connector *c = (connector *)con; - if (gpr_unref(&c->refs)) { - /* c->initial_string_buffer does not need to be destroyed */ - grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); - gpr_free(c); - } -} - -static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - connector_unref(exec_ctx, arg); -} - -static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, - grpc_channel_args *args, - grpc_slice_buffer *read_buffer, void *user_data, - grpc_error *error) { - connector *c = user_data; - if (error != GRPC_ERROR_NONE) { - grpc_channel_args_destroy(args); - gpr_free(read_buffer); - } else { - c->result->transport = - grpc_create_chttp2_transport(exec_ctx, args, endpoint, 1); - GPR_ASSERT(c->result->transport); - grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, - read_buffer); - c->result->channel_args = args; - } - grpc_closure *notify = c->notify; - c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); -} - -static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - connector *c = arg; - grpc_endpoint *tcp = c->tcp; - if (tcp != NULL) { - if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) { - grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent, - c); - grpc_slice_buffer_init(&c->initial_string_buffer); - grpc_slice_buffer_add(&c->initial_string_buffer, - c->args.initial_connect_string); - connector_ref(arg); - grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer, - &c->initial_string_sent); - } else { - grpc_handshake_manager_do_handshake( - exec_ctx, c->handshake_mgr, tcp, c->args.channel_args, - c->args.deadline, NULL /* acceptor */, on_handshake_done, c); - } - } else { - memset(c->result, 0, sizeof(*c->result)); - grpc_closure *notify = c->notify; - c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); - } -} - -static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) {} - -static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con, - const grpc_connect_in_args *args, - grpc_connect_out_args *result, - grpc_closure *notify) { - connector *c = (connector *)con; - GPR_ASSERT(c->notify == NULL); - GPR_ASSERT(notify->cb); - c->notify = notify; - c->args = *args; - c->result = result; - c->tcp = NULL; - grpc_closure_init(&c->connected, connected, c); - grpc_tcp_client_connect(exec_ctx, &c->connected, &c->tcp, - args->interested_parties, args->channel_args, - args->addr, args->deadline); -} - -static const grpc_connector_vtable connector_vtable = { - connector_ref, connector_unref, connector_shutdown, connector_connect}; - -// -// client_channel_factory -// - static void client_channel_factory_ref( grpc_client_channel_factory *cc_factory) {} @@ -173,20 +53,11 @@ static void client_channel_factory_unref( static grpc_subchannel *client_channel_factory_create_subchannel( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, const grpc_subchannel_args *args) { - connector *c = gpr_malloc(sizeof(*c)); - memset(c, 0, sizeof(*c)); - c->base.vtable = &connector_vtable; - gpr_ref_init(&c->refs, 1); - c->handshake_mgr = grpc_handshake_manager_create(); - char *proxy_name = grpc_get_http_proxy_server(); - if (proxy_name != NULL) { - grpc_handshake_manager_add( - c->handshake_mgr, - grpc_http_connect_handshaker_create(proxy_name, args->server_name)); - gpr_free(proxy_name); - } - grpc_subchannel *s = grpc_subchannel_create(exec_ctx, &c->base, args); - grpc_connector_unref(exec_ctx, &c->base); + grpc_connector *connector = grpc_chttp2_connector_create( + exec_ctx, args->server_name, NULL /* create_handshakers */, + NULL /* user_data */); + grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args); + grpc_connector_unref(exec_ctx, connector); return s; } @@ -229,7 +100,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target, GRPC_API_TRACE( "grpc_insecure_channel_create(target=%p, args=%p, reserved=%p)", 3, (target, args, reserved)); - GPR_ASSERT(!reserved); + GPR_ASSERT(reserved == NULL); grpc_client_channel_factory *factory = (grpc_client_channel_factory *)&client_channel_factory; // Add channel args containing the server name and client channel factory. diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index be57f30bd0..63f2488088 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -33,195 +33,18 @@ #include <grpc/grpc.h> -#include <stdlib.h> #include <string.h> -#include <grpc/slice.h> -#include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> #include "src/core/ext/client_channel/client_channel.h" -#include "src/core/ext/client_channel/http_connect_handshaker.h" -#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/ext/transport/chttp2/client/chttp2_connector.h" #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/channel/handshaker.h" -#include "src/core/lib/iomgr/tcp_client.h" -#include "src/core/lib/security/context/security_context.h" #include "src/core/lib/security/credentials/credentials.h" -#include "src/core/lib/security/transport/auth_filters.h" +#include "src/core/lib/security/transport/security_connector.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/channel.h" -#include "src/core/lib/tsi/transport_security_interface.h" - -// -// connector -// - -typedef struct { - grpc_connector base; - gpr_refcount refs; - - grpc_channel_security_connector *security_connector; - - grpc_closure *notify; - grpc_connect_in_args args; - grpc_connect_out_args *result; - grpc_closure initial_string_sent; - grpc_slice_buffer initial_string_buffer; - - gpr_mu mu; - grpc_endpoint *connecting_endpoint; - grpc_endpoint *newly_connecting_endpoint; - - grpc_closure connected_closure; - - grpc_handshake_manager *handshake_mgr; - - // TODO(roth): Remove once we eliminate on_secure_handshake_done(). - grpc_channel_args *tmp_args; -} connector; - -static void connector_ref(grpc_connector *con) { - connector *c = (connector *)con; - gpr_ref(&c->refs); -} - -static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { - connector *c = (connector *)con; - if (gpr_unref(&c->refs)) { - /* c->initial_string_buffer does not need to be destroyed */ - grpc_channel_args_destroy(c->tmp_args); - grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); - gpr_free(c); - } -} - -static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, - grpc_security_status status, - grpc_endpoint *secure_endpoint, - grpc_auth_context *auth_context) { - connector *c = arg; - gpr_mu_lock(&c->mu); - grpc_error *error = GRPC_ERROR_NONE; - if (c->connecting_endpoint == NULL) { - memset(c->result, 0, sizeof(*c->result)); - gpr_mu_unlock(&c->mu); - } else if (status != GRPC_SECURITY_OK) { - error = grpc_error_set_int(GRPC_ERROR_CREATE("Secure handshake failed"), - GRPC_ERROR_INT_SECURITY_STATUS, status); - memset(c->result, 0, sizeof(*c->result)); - c->connecting_endpoint = NULL; - gpr_mu_unlock(&c->mu); - } else { - grpc_arg auth_context_arg; - c->connecting_endpoint = NULL; - gpr_mu_unlock(&c->mu); - c->result->transport = grpc_create_chttp2_transport( - exec_ctx, c->args.channel_args, secure_endpoint, 1); - grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL); - auth_context_arg = grpc_auth_context_to_arg(auth_context); - c->result->channel_args = - grpc_channel_args_copy_and_add(c->tmp_args, &auth_context_arg, 1); - } - grpc_closure *notify = c->notify; - c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); -} - -static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, - grpc_channel_args *args, - grpc_slice_buffer *read_buffer, void *user_data, - grpc_error *error) { - connector *c = user_data; - c->tmp_args = args; - if (error != GRPC_ERROR_NONE) { - gpr_free(read_buffer); - grpc_closure *notify = c->notify; - c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); - } else { - // TODO(roth, jboeuf): Convert security connector handshaking to use new - // handshake API, and then move the code from on_secure_handshake_done() - // into this function. - grpc_channel_security_connector_do_handshake( - exec_ctx, c->security_connector, endpoint, read_buffer, - c->args.deadline, on_secure_handshake_done, c); - } -} - -static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - connector *c = arg; - grpc_handshake_manager_do_handshake( - exec_ctx, c->handshake_mgr, c->connecting_endpoint, c->args.channel_args, - c->args.deadline, NULL /* acceptor */, on_handshake_done, c); -} - -static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - connector *c = arg; - grpc_endpoint *tcp = c->newly_connecting_endpoint; - if (tcp != NULL) { - gpr_mu_lock(&c->mu); - GPR_ASSERT(c->connecting_endpoint == NULL); - c->connecting_endpoint = tcp; - gpr_mu_unlock(&c->mu); - if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) { - grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent, - c); - grpc_slice_buffer_init(&c->initial_string_buffer); - grpc_slice_buffer_add(&c->initial_string_buffer, - c->args.initial_connect_string); - grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer, - &c->initial_string_sent); - } else { - grpc_handshake_manager_do_handshake( - exec_ctx, c->handshake_mgr, tcp, c->args.channel_args, - c->args.deadline, NULL /* acceptor */, on_handshake_done, c); - } - } else { - memset(c->result, 0, sizeof(*c->result)); - grpc_closure *notify = c->notify; - c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); - } -} - -static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) { - connector *c = (connector *)con; - grpc_endpoint *ep; - gpr_mu_lock(&c->mu); - ep = c->connecting_endpoint; - c->connecting_endpoint = NULL; - gpr_mu_unlock(&c->mu); - if (ep) { - grpc_endpoint_shutdown(exec_ctx, ep); - } -} - -static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con, - const grpc_connect_in_args *args, - grpc_connect_out_args *result, - grpc_closure *notify) { - connector *c = (connector *)con; - GPR_ASSERT(c->notify == NULL); - c->notify = notify; - c->args = *args; - c->result = result; - gpr_mu_lock(&c->mu); - GPR_ASSERT(c->connecting_endpoint == NULL); - gpr_mu_unlock(&c->mu); - grpc_closure_init(&c->connected_closure, connected, c); - grpc_tcp_client_connect( - exec_ctx, &c->connected_closure, &c->newly_connecting_endpoint, - args->interested_parties, args->channel_args, args->addr, args->deadline); -} - -static const grpc_connector_vtable connector_vtable = { - connector_ref, connector_unref, connector_shutdown, connector_connect}; - -// -// client_channel_factory -// typedef struct { grpc_client_channel_factory base; @@ -245,26 +68,21 @@ static void client_channel_factory_unref( } } +static void create_handshakers(grpc_exec_ctx *exec_ctx, + void *security_connector, + grpc_handshake_manager *handshake_mgr) { + grpc_channel_security_connector_create_handshakers( + exec_ctx, security_connector, handshake_mgr); +} + static grpc_subchannel *client_channel_factory_create_subchannel( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, const grpc_subchannel_args *args) { client_channel_factory *f = (client_channel_factory *)cc_factory; - connector *c = gpr_malloc(sizeof(*c)); - memset(c, 0, sizeof(*c)); - c->base.vtable = &connector_vtable; - c->security_connector = f->security_connector; - c->handshake_mgr = grpc_handshake_manager_create(); - char *proxy_name = grpc_get_http_proxy_server(); - if (proxy_name != NULL) { - grpc_handshake_manager_add( - c->handshake_mgr, - grpc_http_connect_handshaker_create(proxy_name, args->server_name)); - gpr_free(proxy_name); - } - gpr_mu_init(&c->mu); - gpr_ref_init(&c->refs, 1); - grpc_subchannel *s = grpc_subchannel_create(exec_ctx, &c->base, args); - grpc_connector_unref(exec_ctx, &c->base); + grpc_connector *connector = grpc_chttp2_connector_create( + exec_ctx, args->server_name, create_handshakers, f->security_connector); + grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args); + grpc_connector_unref(exec_ctx, connector); return s; } diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c new file mode 100644 index 0000000000..7795606e73 --- /dev/null +++ b/src/core/ext/transport/chttp2/server/chttp2_server.c @@ -0,0 +1,356 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/transport/chttp2/server/chttp2_server.h" + +#include <grpc/grpc.h> + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/sync.h> +#include <grpc/support/useful.h> + +#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/channel/http_server_filter.h" +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/tcp_server.h" +#include "src/core/lib/surface/api_trace.h" +#include "src/core/lib/surface/server.h" + +void grpc_chttp2_server_handshaker_factory_create_handshakers( + grpc_exec_ctx *exec_ctx, + grpc_chttp2_server_handshaker_factory *handshaker_factory, + grpc_handshake_manager *handshake_mgr) { + if (handshaker_factory != NULL) { + handshaker_factory->vtable->create_handshakers( + exec_ctx, handshaker_factory, handshake_mgr); + } +} + +void grpc_chttp2_server_handshaker_factory_destroy( + grpc_exec_ctx *exec_ctx, + grpc_chttp2_server_handshaker_factory *handshaker_factory) { + if (handshaker_factory != NULL) { + handshaker_factory->vtable->destroy(exec_ctx, handshaker_factory); + } +} + + +typedef struct pending_handshake_manager_node { + grpc_handshake_manager *handshake_mgr; + struct pending_handshake_manager_node *next; +} pending_handshake_manager_node; + +typedef struct { + grpc_server *server; + grpc_tcp_server *tcp_server; + grpc_channel_args *args; + grpc_chttp2_server_handshaker_factory *handshaker_factory; + gpr_mu mu; + bool shutdown; + grpc_closure tcp_server_shutdown_complete; + grpc_closure *server_destroy_listener_done; + pending_handshake_manager_node *pending_handshake_mgrs; +} server_state; + +typedef struct { + server_state *server_state; + grpc_pollset *accepting_pollset; + grpc_tcp_server_acceptor *acceptor; + grpc_handshake_manager *handshake_mgr; +} server_connection_state; + +static void pending_handshake_manager_add_locked( + server_state *state, grpc_handshake_manager *handshake_mgr) { + pending_handshake_manager_node *node = gpr_malloc(sizeof(*node)); + node->handshake_mgr = handshake_mgr; + node->next = state->pending_handshake_mgrs; + state->pending_handshake_mgrs = node; +} + +static void pending_handshake_manager_remove_locked( + server_state *state, grpc_handshake_manager *handshake_mgr) { + pending_handshake_manager_node **prev_node = &state->pending_handshake_mgrs; + for (pending_handshake_manager_node *node = state->pending_handshake_mgrs; + node != NULL; node = node->next) { + if (node->handshake_mgr == handshake_mgr) { + *prev_node = node->next; + gpr_free(node); + break; + } + prev_node = &node->next; + } +} + +static void pending_handshake_manager_shutdown_locked(grpc_exec_ctx *exec_ctx, + server_state *state) { + pending_handshake_manager_node *prev_node = NULL; + for (pending_handshake_manager_node *node = state->pending_handshake_mgrs; + node != NULL; node = node->next) { + grpc_handshake_manager_shutdown(exec_ctx, node->handshake_mgr); + gpr_free(prev_node); + prev_node = node; + } + gpr_free(prev_node); + state->pending_handshake_mgrs = NULL; +} + +static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_handshaker_args *args = arg; + server_connection_state *connection_state = args->user_data; + gpr_mu_lock(&connection_state->server_state->mu); + if (error != GRPC_ERROR_NONE || connection_state->server_state->shutdown) { + const char *error_str = grpc_error_string(error); + gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str); + grpc_error_free_string(error_str); + if (error == GRPC_ERROR_NONE) { + // We were shut down after handshaking completed successfully, so + // destroy the endpoint here. + // TODO(ctiller): It is currently necessary to shutdown endpoints + // before destroying them, even if we know that there are no + // pending read/write callbacks. This should be fixed, at which + // point this can be removed. + grpc_endpoint_shutdown(exec_ctx, args->endpoint); + grpc_endpoint_destroy(exec_ctx, args->endpoint); + grpc_channel_args_destroy(args->args); + grpc_slice_buffer_destroy(args->read_buffer); + gpr_free(args->read_buffer); + } + } else { + grpc_transport *transport = + grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 0); + grpc_server_setup_transport( + exec_ctx, connection_state->server_state->server, transport, + connection_state->accepting_pollset, args->args); + grpc_chttp2_transport_start_reading(exec_ctx, transport, args->read_buffer); + grpc_channel_args_destroy(args->args); + } + pending_handshake_manager_remove_locked(connection_state->server_state, + connection_state->handshake_mgr); + gpr_mu_unlock(&connection_state->server_state->mu); + grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); + grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server); + gpr_free(connection_state); +} + +static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, + grpc_pollset *accepting_pollset, + grpc_tcp_server_acceptor *acceptor) { + server_state *state = arg; + gpr_mu_lock(&state->mu); + if (state->shutdown) { + gpr_mu_unlock(&state->mu); + grpc_endpoint_destroy(exec_ctx, tcp); + return; + } + grpc_handshake_manager *handshake_mgr = grpc_handshake_manager_create(); + pending_handshake_manager_add_locked(state, handshake_mgr); + gpr_mu_unlock(&state->mu); + grpc_tcp_server_ref(state->tcp_server); + server_connection_state *connection_state = + gpr_malloc(sizeof(*connection_state)); + connection_state->server_state = state; + connection_state->accepting_pollset = accepting_pollset; + connection_state->acceptor = acceptor; + connection_state->handshake_mgr = handshake_mgr; + grpc_chttp2_server_handshaker_factory_create_handshakers( + exec_ctx, state->handshaker_factory, connection_state->handshake_mgr); + // TODO(roth): We should really get this timeout value from channel + // args instead of hard-coding it. + const gpr_timespec deadline = gpr_time_add( + gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); + grpc_handshake_manager_do_handshake( + exec_ctx, connection_state->handshake_mgr, tcp, state->args, deadline, + acceptor, on_handshake_done, connection_state); +} + +/* Server callback: start listening on our ports */ +static void server_start_listener(grpc_exec_ctx *exec_ctx, grpc_server *server, + void *arg, grpc_pollset **pollsets, + size_t pollset_count) { + server_state *state = arg; + gpr_mu_lock(&state->mu); + state->shutdown = false; + gpr_mu_unlock(&state->mu); + grpc_tcp_server_start(exec_ctx, state->tcp_server, pollsets, pollset_count, + on_accept, state); +} + +static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + server_state *state = arg; + /* ensure all threads have unlocked */ + gpr_mu_lock(&state->mu); + grpc_closure *destroy_done = state->server_destroy_listener_done; + GPR_ASSERT(state->shutdown); + pending_handshake_manager_shutdown_locked(exec_ctx, state); + gpr_mu_unlock(&state->mu); + // Flush queued work before destroying handshaker factory, since that + // may do a synchronous unref. + grpc_exec_ctx_flush(exec_ctx); + grpc_chttp2_server_handshaker_factory_destroy(exec_ctx, + state->handshaker_factory); + if (destroy_done != NULL) { + destroy_done->cb(exec_ctx, destroy_done->cb_arg, GRPC_ERROR_REF(error)); + grpc_exec_ctx_flush(exec_ctx); + } + grpc_channel_args_destroy(state->args); + gpr_mu_destroy(&state->mu); + gpr_free(state); +} + +/* Server callback: destroy the tcp listener (so we don't generate further + callbacks) */ +static void server_destroy_listener(grpc_exec_ctx *exec_ctx, + grpc_server *server, void *arg, + grpc_closure *destroy_done) { + server_state *state = arg; + gpr_mu_lock(&state->mu); + state->shutdown = true; + state->server_destroy_listener_done = destroy_done; + grpc_tcp_server *tcp_server = state->tcp_server; + gpr_mu_unlock(&state->mu); + grpc_tcp_server_shutdown_listeners(exec_ctx, tcp_server); + grpc_tcp_server_unref(exec_ctx, tcp_server); +} + +grpc_error *grpc_chttp2_server_add_port( + grpc_exec_ctx *exec_ctx, grpc_server *server, const char *addr, + grpc_channel_args *args, + grpc_chttp2_server_handshaker_factory *handshaker_factory, int *port_num) { + grpc_resolved_addresses *resolved = NULL; + grpc_tcp_server *tcp_server = NULL; + size_t i; + size_t count = 0; + int port_temp; + grpc_error *err = GRPC_ERROR_NONE; + server_state *state = NULL; + grpc_error **errors = NULL; + + *port_num = -1; + + /* resolve address */ + err = grpc_blocking_resolve_address(addr, "https", &resolved); + if (err != GRPC_ERROR_NONE) { + goto error; + } + state = gpr_malloc(sizeof(*state)); + memset(state, 0, sizeof(*state)); + grpc_closure_init(&state->tcp_server_shutdown_complete, + tcp_server_shutdown_complete, state); + err = + grpc_tcp_server_create(exec_ctx, &state->tcp_server_shutdown_complete, + args, &tcp_server); + if (err != GRPC_ERROR_NONE) { + goto error; + } + + state->server = server; + state->tcp_server = tcp_server; + state->args = args; + state->handshaker_factory = handshaker_factory; + state->shutdown = true; + gpr_mu_init(&state->mu); + + const size_t naddrs = resolved->naddrs; + errors = gpr_malloc(sizeof(*errors) * naddrs); + for (i = 0; i < naddrs; i++) { + errors[i] = + grpc_tcp_server_add_port(tcp_server, &resolved->addrs[i], &port_temp); + if (errors[i] == GRPC_ERROR_NONE) { + if (*port_num == -1) { + *port_num = port_temp; + } else { + GPR_ASSERT(*port_num == port_temp); + } + count++; + } + } + if (count == 0) { + char *msg; + gpr_asprintf(&msg, "No address added out of total %" PRIuPTR " resolved", + naddrs); + err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs); + gpr_free(msg); + goto error; + } else if (count != naddrs) { + char *msg; + gpr_asprintf(&msg, "Only %" PRIuPTR + " addresses added out of total %" PRIuPTR " resolved", + count, naddrs); + err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs); + gpr_free(msg); + + const char *warning_message = grpc_error_string(err); + gpr_log(GPR_INFO, "WARNING: %s", warning_message); + grpc_error_free_string(warning_message); + /* we managed to bind some addresses: continue */ + } + grpc_resolved_addresses_destroy(resolved); + + /* Register with the server only upon success */ + grpc_server_add_listener(exec_ctx, server, state, server_start_listener, + server_destroy_listener); + goto done; + +/* Error path: cleanup and return */ +error: + GPR_ASSERT(err != GRPC_ERROR_NONE); + if (resolved) { + grpc_resolved_addresses_destroy(resolved); + } + if (tcp_server) { + grpc_tcp_server_unref(exec_ctx, tcp_server); + } else { + grpc_channel_args_destroy(args); + grpc_chttp2_server_handshaker_factory_destroy(exec_ctx, handshaker_factory); + gpr_free(state); + } + *port_num = 0; + +done: + if (errors != NULL) { + for (i = 0; i < naddrs; i++) { + GRPC_ERROR_UNREF(errors[i]); + } + gpr_free(errors); + } + return err; +} diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.h b/src/core/ext/transport/chttp2/server/chttp2_server.h new file mode 100644 index 0000000000..b1ff04bcbb --- /dev/null +++ b/src/core/ext/transport/chttp2/server/chttp2_server.h @@ -0,0 +1,79 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H + +#include <grpc/impl/codegen/grpc_types.h> + +#include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/iomgr/exec_ctx.h" + +/// A server handshaker factory is used to create handshakers for server +/// connections. +typedef struct grpc_chttp2_server_handshaker_factory + grpc_chttp2_server_handshaker_factory; + +typedef struct { + void (*create_handshakers)( + grpc_exec_ctx *exec_ctx, + grpc_chttp2_server_handshaker_factory *handshaker_factory, + grpc_handshake_manager *handshake_mgr); + void (*destroy)(grpc_exec_ctx *exec_ctx, + grpc_chttp2_server_handshaker_factory *handshaker_factory); +} grpc_chttp2_server_handshaker_factory_vtable; + +struct grpc_chttp2_server_handshaker_factory { + const grpc_chttp2_server_handshaker_factory_vtable *vtable; +}; + +void grpc_chttp2_server_handshaker_factory_create_handshakers( + grpc_exec_ctx *exec_ctx, + grpc_chttp2_server_handshaker_factory *handshaker_factory, + grpc_handshake_manager *handshake_mgr); + +void grpc_chttp2_server_handshaker_factory_destroy( + grpc_exec_ctx *exec_ctx, + grpc_chttp2_server_handshaker_factory *handshaker_factory); + +/// Adds a port to \a server. Sets \a port_num to the port number. +/// If \a handshaker_factory is not NULL, it will be used to create +/// handshakers for the port. +/// Takes ownership of \a args and \a handshaker_factory. +grpc_error *grpc_chttp2_server_add_port( + grpc_exec_ctx *exec_ctx, grpc_server *server, const char *addr, + grpc_channel_args *args, + grpc_chttp2_server_handshaker_factory *handshaker_factory, + int *port_num); + +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H */ diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index c18d618f96..366312bd72 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -33,180 +33,28 @@ #include <grpc/grpc.h> -#include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <grpc/support/useful.h> -#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/ext/transport/chttp2/server/chttp2_server.h" #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/channel/handshaker.h" -#include "src/core/lib/channel/http_server_filter.h" -#include "src/core/lib/iomgr/resolve_address.h" -#include "src/core/lib/iomgr/tcp_server.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -typedef struct server_connect_state { - grpc_server *server; - grpc_pollset *accepting_pollset; - grpc_tcp_server_acceptor *acceptor; - grpc_handshake_manager *handshake_mgr; -} server_connect_state; - -static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, - grpc_channel_args *args, - grpc_slice_buffer *read_buffer, void *user_data, - grpc_error *error) { - server_connect_state *state = user_data; - if (error != GRPC_ERROR_NONE) { - const char *error_str = grpc_error_string(error); - gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str); - grpc_error_free_string(error_str); - GRPC_ERROR_UNREF(error); - grpc_handshake_manager_shutdown(exec_ctx, state->handshake_mgr); - gpr_free(read_buffer); - } else { - // Beware that the call to grpc_create_chttp2_transport() has to happen - // before grpc_tcp_server_destroy(). This is fine here, but similar code - // asynchronously doing a handshake instead of calling - // grpc_tcp_server_start() (as in server_secure_chttp2.c) needs to add - // synchronization to avoid this case. - grpc_transport *transport = - grpc_create_chttp2_transport(exec_ctx, args, endpoint, 0); - grpc_server_setup_transport(exec_ctx, state->server, transport, - state->accepting_pollset, - grpc_server_get_channel_args(state->server)); - grpc_chttp2_transport_start_reading(exec_ctx, transport, read_buffer); - } - // Clean up. - grpc_channel_args_destroy(args); - grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr); - gpr_free(state); -} - -static void on_accept(grpc_exec_ctx *exec_ctx, void *server, grpc_endpoint *tcp, - grpc_pollset *accepting_pollset, - grpc_tcp_server_acceptor *acceptor) { - server_connect_state *state = gpr_malloc(sizeof(server_connect_state)); - state->server = server; - state->accepting_pollset = accepting_pollset; - state->acceptor = acceptor; - state->handshake_mgr = grpc_handshake_manager_create(); - // TODO(roth): We should really get this timeout value from channel - // args instead of hard-coding it. - const gpr_timespec deadline = gpr_time_add( - gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); - grpc_handshake_manager_do_handshake( - exec_ctx, state->handshake_mgr, tcp, grpc_server_get_channel_args(server), - deadline, acceptor, on_handshake_done, state); -} - -/* Server callback: start listening on our ports */ -static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, - grpc_pollset **pollsets, size_t pollset_count) { - grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_start(exec_ctx, tcp, pollsets, pollset_count, on_accept, - server); -} - -/* Server callback: destroy the tcp listener (so we don't generate further - callbacks) */ -static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, - grpc_closure *destroy_done) { - grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_shutdown_listeners(exec_ctx, tcp); - grpc_tcp_server_unref(exec_ctx, tcp); - grpc_exec_ctx_sched(exec_ctx, destroy_done, GRPC_ERROR_NONE, NULL); -} - int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { - grpc_resolved_addresses *resolved = NULL; - grpc_tcp_server *tcp = NULL; - size_t i; - size_t count = 0; - int port_num = -1; - int port_temp; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_error *err = GRPC_ERROR_NONE; - + int port_num = 0; GRPC_API_TRACE("grpc_server_add_insecure_http2_port(server=%p, addr=%s)", 2, (server, addr)); - - grpc_error **errors = NULL; - err = grpc_blocking_resolve_address(addr, "https", &resolved); - if (err != GRPC_ERROR_NONE) { - goto error; - } - - err = grpc_tcp_server_create(&exec_ctx, NULL, - grpc_server_get_channel_args(server), &tcp); + grpc_error* err = grpc_chttp2_server_add_port( + &exec_ctx, server, addr, + grpc_channel_args_copy(grpc_server_get_channel_args(server)), + NULL /* handshaker_factory */, &port_num); if (err != GRPC_ERROR_NONE) { - goto error; - } - - const size_t naddrs = resolved->naddrs; - errors = gpr_malloc(sizeof(*errors) * naddrs); - for (i = 0; i < naddrs; i++) { - errors[i] = grpc_tcp_server_add_port(tcp, &resolved->addrs[i], &port_temp); - if (errors[i] == GRPC_ERROR_NONE) { - if (port_num == -1) { - port_num = port_temp; - } else { - GPR_ASSERT(port_num == port_temp); - } - count++; - } + const char *msg = grpc_error_string(err); + gpr_log(GPR_ERROR, "%s", msg); + grpc_error_free_string(msg); + GRPC_ERROR_UNREF(err); } - if (count == 0) { - char *msg; - gpr_asprintf(&msg, "No address added out of total %" PRIuPTR " resolved", - naddrs); - err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs); - gpr_free(msg); - goto error; - } else if (count != naddrs) { - char *msg; - gpr_asprintf(&msg, "Only %" PRIuPTR - " addresses added out of total %" PRIuPTR " resolved", - count, naddrs); - err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs); - gpr_free(msg); - - const char *warning_message = grpc_error_string(err); - gpr_log(GPR_INFO, "WARNING: %s", warning_message); - grpc_error_free_string(warning_message); - /* we managed to bind some addresses: continue */ - } - grpc_resolved_addresses_destroy(resolved); - - /* Register with the server only upon success */ - grpc_server_add_listener(&exec_ctx, server, tcp, start, destroy); - goto done; - -/* Error path: cleanup and return */ -error: - GPR_ASSERT(err != GRPC_ERROR_NONE); - if (resolved) { - grpc_resolved_addresses_destroy(resolved); - } - if (tcp) { - grpc_tcp_server_unref(&exec_ctx, tcp); - } - port_num = 0; - - const char *msg = grpc_error_string(err); - gpr_log(GPR_ERROR, "%s", msg); - grpc_error_free_string(msg); - GRPC_ERROR_UNREF(err); - -done: grpc_exec_ctx_finish(&exec_ctx); - if (errors != NULL) { - for (i = 0; i < naddrs; i++) { - GRPC_ERROR_UNREF(errors[i]); - } - } - gpr_free(errors); return port_num; } diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index 942638ad7f..5f41728132 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -38,218 +38,63 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> -#include <grpc/support/sync.h> -#include <grpc/support/useful.h> + +#include "src/core/ext/transport/chttp2/server/chttp2_server.h" + #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" -#include "src/core/lib/channel/http_server_filter.h" -#include "src/core/lib/iomgr/endpoint.h" -#include "src/core/lib/iomgr/resolve_address.h" -#include "src/core/lib/iomgr/tcp_server.h" #include "src/core/lib/security/context/security_context.h" #include "src/core/lib/security/credentials/credentials.h" -#include "src/core/lib/security/transport/auth_filters.h" -#include "src/core/lib/security/transport/security_connector.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -typedef struct server_secure_state { - grpc_server *server; - grpc_tcp_server *tcp; - grpc_server_security_connector *sc; - grpc_server_credentials *creds; - bool is_shutdown; - gpr_mu mu; - grpc_closure tcp_server_shutdown_complete; - grpc_closure *server_destroy_listener_done; -} server_secure_state; - -typedef struct server_secure_connect { - server_secure_state *server_state; - grpc_pollset *accepting_pollset; - grpc_tcp_server_acceptor *acceptor; - grpc_handshake_manager *handshake_mgr; - // TODO(roth): Remove the following two fields when we eliminate - // grpc_server_security_connector_do_handshake(). - gpr_timespec deadline; - grpc_channel_args *args; -} server_secure_connect; - -static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep, - grpc_security_status status, - grpc_endpoint *secure_endpoint, - grpc_auth_context *auth_context) { - server_secure_connect *connection_state = statep; - if (status == GRPC_SECURITY_OK) { - if (secure_endpoint) { - gpr_mu_lock(&connection_state->server_state->mu); - if (!connection_state->server_state->is_shutdown) { - grpc_transport *transport = grpc_create_chttp2_transport( - exec_ctx, grpc_server_get_channel_args( - connection_state->server_state->server), - secure_endpoint, 0); - grpc_arg args_to_add[2]; - args_to_add[0] = grpc_server_credentials_to_arg( - connection_state->server_state->creds); - args_to_add[1] = grpc_auth_context_to_arg(auth_context); - grpc_channel_args *args_copy = grpc_channel_args_copy_and_add( - connection_state->args, args_to_add, GPR_ARRAY_SIZE(args_to_add)); - grpc_server_setup_transport( - exec_ctx, connection_state->server_state->server, transport, - connection_state->accepting_pollset, args_copy); - grpc_channel_args_destroy(args_copy); - grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL); - } else { - /* We need to consume this here, because the server may already have - * gone away. */ - grpc_endpoint_destroy(exec_ctx, secure_endpoint); - } - gpr_mu_unlock(&connection_state->server_state->mu); - } - } else { - gpr_log(GPR_ERROR, "Secure transport failed with error %d", status); - } - grpc_channel_args_destroy(connection_state->args); - grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp); - gpr_free(connection_state); -} - -static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, - grpc_channel_args *args, - grpc_slice_buffer *read_buffer, void *user_data, - grpc_error *error) { - server_secure_connect *connection_state = user_data; - if (error != GRPC_ERROR_NONE) { - const char *error_str = grpc_error_string(error); - gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str); - grpc_error_free_string(error_str); - GRPC_ERROR_UNREF(error); - grpc_channel_args_destroy(args); - gpr_free(read_buffer); - grpc_handshake_manager_shutdown(exec_ctx, connection_state->handshake_mgr); - grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); - grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp); - gpr_free(connection_state); - return; - } - grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); - connection_state->handshake_mgr = NULL; - // TODO(roth, jboeuf): Convert security connector handshaking to use new - // handshake API, and then move the code from on_secure_handshake_done() - // into this function. - connection_state->args = args; - grpc_server_security_connector_do_handshake( - exec_ctx, connection_state->server_state->sc, connection_state->acceptor, - endpoint, read_buffer, connection_state->deadline, - on_secure_handshake_done, connection_state); -} - -static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp, - grpc_pollset *accepting_pollset, - grpc_tcp_server_acceptor *acceptor) { - server_secure_state *server_state = statep; - server_secure_connect *connection_state = NULL; - gpr_mu_lock(&server_state->mu); - if (server_state->is_shutdown) { - gpr_mu_unlock(&server_state->mu); - grpc_endpoint_destroy(exec_ctx, tcp); - return; - } - gpr_mu_unlock(&server_state->mu); - grpc_tcp_server_ref(server_state->tcp); - connection_state = gpr_malloc(sizeof(*connection_state)); - connection_state->server_state = server_state; - connection_state->accepting_pollset = accepting_pollset; - connection_state->acceptor = acceptor; - connection_state->handshake_mgr = grpc_handshake_manager_create(); - // TODO(roth): We should really get this timeout value from channel - // args instead of hard-coding it. - connection_state->deadline = gpr_time_add( - gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); - grpc_handshake_manager_do_handshake( - exec_ctx, connection_state->handshake_mgr, tcp, - grpc_server_get_channel_args(connection_state->server_state->server), - connection_state->deadline, acceptor, on_handshake_done, - connection_state); +typedef struct { + grpc_chttp2_server_handshaker_factory base; + grpc_server_security_connector *security_connector; +} server_security_handshaker_factory; + +static void server_security_handshaker_factory_create_handshakers( + grpc_exec_ctx *exec_ctx, grpc_chttp2_server_handshaker_factory *hf, + grpc_handshake_manager *handshake_mgr) { + server_security_handshaker_factory *handshaker_factory = + (server_security_handshaker_factory *)hf; + grpc_server_security_connector_create_handshakers( + exec_ctx, handshaker_factory->security_connector, handshake_mgr); } -/* Server callback: start listening on our ports */ -static void server_start_listener(grpc_exec_ctx *exec_ctx, grpc_server *server, - void *statep, grpc_pollset **pollsets, - size_t pollset_count) { - server_secure_state *server_state = statep; - gpr_mu_lock(&server_state->mu); - server_state->is_shutdown = false; - gpr_mu_unlock(&server_state->mu); - grpc_tcp_server_start(exec_ctx, server_state->tcp, pollsets, pollset_count, - on_accept, server_state); +static void server_security_handshaker_factory_destroy( + grpc_exec_ctx* exec_ctx, grpc_chttp2_server_handshaker_factory *hf) { + server_security_handshaker_factory *handshaker_factory = + (server_security_handshaker_factory *)hf; + GRPC_SECURITY_CONNECTOR_UNREF(&handshaker_factory->security_connector->base, + "server"); + gpr_free(hf); } -static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *statep, - grpc_error *error) { - server_secure_state *server_state = statep; - /* ensure all threads have unlocked */ - gpr_mu_lock(&server_state->mu); - grpc_closure *destroy_done = server_state->server_destroy_listener_done; - GPR_ASSERT(server_state->is_shutdown); - gpr_mu_unlock(&server_state->mu); - /* clean up */ - grpc_server_security_connector_shutdown(exec_ctx, server_state->sc); - - /* Flush queued work before a synchronous unref. */ - grpc_exec_ctx_flush(exec_ctx); - GRPC_SECURITY_CONNECTOR_UNREF(&server_state->sc->base, "server"); - grpc_server_credentials_unref(server_state->creds); - - if (destroy_done != NULL) { - destroy_done->cb(exec_ctx, destroy_done->cb_arg, GRPC_ERROR_REF(error)); - grpc_exec_ctx_flush(exec_ctx); - } - gpr_free(server_state); -} - -static void server_destroy_listener(grpc_exec_ctx *exec_ctx, - grpc_server *server, void *statep, - grpc_closure *callback) { - server_secure_state *server_state = statep; - grpc_tcp_server *tcp; - gpr_mu_lock(&server_state->mu); - server_state->is_shutdown = true; - server_state->server_destroy_listener_done = callback; - tcp = server_state->tcp; - gpr_mu_unlock(&server_state->mu); - grpc_tcp_server_shutdown_listeners(exec_ctx, tcp); - grpc_tcp_server_unref(exec_ctx, server_state->tcp); -} +static const grpc_chttp2_server_handshaker_factory_vtable + server_security_handshaker_factory_vtable = { + server_security_handshaker_factory_create_handshakers, + server_security_handshaker_factory_destroy}; int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, grpc_server_credentials *creds) { - grpc_resolved_addresses *resolved = NULL; - grpc_tcp_server *tcp = NULL; - server_secure_state *server_state = NULL; - size_t i; - size_t count = 0; - int port_num = -1; - int port_temp; - grpc_security_status status = GRPC_SECURITY_ERROR; - grpc_server_security_connector *sc = NULL; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_error *err = GRPC_ERROR_NONE; - grpc_error **errors = NULL; - + grpc_server_security_connector *sc = NULL; + int port_num = 0; GRPC_API_TRACE( "grpc_server_add_secure_http2_port(" "server=%p, addr=%s, creds=%p)", 3, (server, addr, creds)); - - /* create security context */ + // Create security context. if (creds == NULL) { err = GRPC_ERROR_CREATE( "No credentials specified for secure server port (creds==NULL)"); - goto error; + goto done; } - status = grpc_server_credentials_create_security_connector(creds, &sc); + grpc_security_status status = + grpc_server_credentials_create_security_connector(creds, &sc); if (status != GRPC_SECURITY_OK) { char *msg; gpr_asprintf(&msg, @@ -258,107 +103,28 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, err = grpc_error_set_int(GRPC_ERROR_CREATE(msg), GRPC_ERROR_INT_SECURITY_STATUS, status); gpr_free(msg); - goto error; + goto done; } - sc->channel_args = grpc_server_get_channel_args(server); - - /* resolve address */ - err = grpc_blocking_resolve_address(addr, "https", &resolved); - if (err != GRPC_ERROR_NONE) { - goto error; - } - server_state = gpr_malloc(sizeof(*server_state)); - memset(server_state, 0, sizeof(*server_state)); - grpc_closure_init(&server_state->tcp_server_shutdown_complete, - tcp_server_shutdown_complete, server_state); - err = grpc_tcp_server_create(&exec_ctx, - &server_state->tcp_server_shutdown_complete, - grpc_server_get_channel_args(server), &tcp); + // Create handshaker factory. + server_security_handshaker_factory* handshaker_factory = + gpr_malloc(sizeof(*handshaker_factory)); + memset(handshaker_factory, 0, sizeof(*handshaker_factory)); + handshaker_factory->base.vtable = &server_security_handshaker_factory_vtable; + handshaker_factory->security_connector = sc; + // Create channel args. + grpc_arg channel_arg = grpc_server_credentials_to_arg(creds); + grpc_channel_args *args = grpc_channel_args_copy_and_add( + grpc_server_get_channel_args(server), &channel_arg, 1); + // Add server port. + err = grpc_chttp2_server_add_port(&exec_ctx, server, addr, args, + &handshaker_factory->base, &port_num); +done: + grpc_exec_ctx_finish(&exec_ctx); if (err != GRPC_ERROR_NONE) { - goto error; + const char *msg = grpc_error_string(err); + gpr_log(GPR_ERROR, "%s", msg); + grpc_error_free_string(msg); + GRPC_ERROR_UNREF(err); } - - server_state->server = server; - server_state->tcp = tcp; - server_state->sc = sc; - server_state->creds = grpc_server_credentials_ref(creds); - server_state->is_shutdown = true; - gpr_mu_init(&server_state->mu); - - errors = gpr_malloc(sizeof(*errors) * resolved->naddrs); - for (i = 0; i < resolved->naddrs; i++) { - errors[i] = grpc_tcp_server_add_port(tcp, &resolved->addrs[i], &port_temp); - if (errors[i] == GRPC_ERROR_NONE) { - if (port_num == -1) { - port_num = port_temp; - } else { - GPR_ASSERT(port_num == port_temp); - } - count++; - } - } - if (count == 0) { - char *msg; - gpr_asprintf(&msg, "No address added out of total %" PRIuPTR " resolved", - resolved->naddrs); - err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, resolved->naddrs); - gpr_free(msg); - goto error; - } else if (count != resolved->naddrs) { - char *msg; - gpr_asprintf(&msg, "Only %" PRIuPTR - " addresses added out of total %" PRIuPTR " resolved", - count, resolved->naddrs); - err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, resolved->naddrs); - gpr_free(msg); - - const char *warning_message = grpc_error_string(err); - gpr_log(GPR_INFO, "WARNING: %s", warning_message); - grpc_error_free_string(warning_message); - /* we managed to bind some addresses: continue */ - } else { - for (i = 0; i < resolved->naddrs; i++) { - GRPC_ERROR_UNREF(errors[i]); - } - } - gpr_free(errors); - errors = NULL; - grpc_resolved_addresses_destroy(resolved); - - /* Register with the server only upon success */ - grpc_server_add_listener(&exec_ctx, server, server_state, - server_start_listener, server_destroy_listener); - - grpc_exec_ctx_finish(&exec_ctx); return port_num; - -/* Error path: cleanup and return */ -error: - GPR_ASSERT(err != GRPC_ERROR_NONE); - if (errors != NULL) { - for (i = 0; i < resolved->naddrs; i++) { - GRPC_ERROR_UNREF(errors[i]); - } - gpr_free(errors); - } - if (resolved) { - grpc_resolved_addresses_destroy(resolved); - } - if (tcp) { - grpc_tcp_server_unref(&exec_ctx, tcp); - } else { - if (sc) { - grpc_exec_ctx_flush(&exec_ctx); - GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "server"); - } - if (server_state) { - gpr_free(server_state); - } - } - grpc_exec_ctx_finish(&exec_ctx); - const char *msg = grpc_error_string(err); - GRPC_ERROR_UNREF(err); - gpr_log(GPR_ERROR, "%s", msg); - grpc_error_free_string(msg); - return 0; } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 4e3c7ff681..3e7c078d3c 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -111,9 +111,6 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx, static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, void *byte_stream, grpc_error *error_ignored); -static void fail_pending_writes(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, grpc_chttp2_stream *s, - grpc_error *error); static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); @@ -428,6 +425,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, /* flush writable stream list to avoid dangling references */ grpc_chttp2_stream *s; while (grpc_chttp2_list_pop_writable_stream(t, &s)) { + grpc_chttp2_leave_writing_lists(exec_ctx, t, s); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close"); } end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error)); @@ -523,6 +521,10 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, } } + if (s->fail_pending_writes_on_writes_finished_error != NULL) { + GRPC_ERROR_UNREF(s->fail_pending_writes_on_writes_finished_error); + } + GPR_ASSERT(s->send_initial_metadata_finished == NULL); GPR_ASSERT(s->fetching_send_message == NULL); GPR_ASSERT(s->send_trailing_metadata_finished == NULL); @@ -704,8 +706,6 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, } } - grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error)); - switch (t->write_state) { case GRPC_CHTTP2_WRITE_STATE_IDLE: GPR_UNREACHABLE_CODE(break); @@ -734,6 +734,8 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, break; } + grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error)); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing"); GPR_TIMER_END("terminate_writing_with_lock", 0); } @@ -1404,6 +1406,7 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } if (grpc_chttp2_list_remove_writable_stream(t, s)) { + grpc_chttp2_leave_writing_lists(exec_ctx, t, s); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:remove_stream"); } @@ -1534,9 +1537,41 @@ static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s, return error; } -static void fail_pending_writes(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, grpc_chttp2_stream *s, - grpc_error *error) { +void grpc_chttp2_leave_writing_lists(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s) { + if (s->need_fail_pending_writes_on_writes_finished) { + grpc_error *error = s->fail_pending_writes_on_writes_finished_error; + s->fail_pending_writes_on_writes_finished_error = NULL; + s->need_fail_pending_writes_on_writes_finished = false; + grpc_chttp2_fail_pending_writes(exec_ctx, t, s, error); + } +} + +void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, grpc_error *error) { + if (s->need_fail_pending_writes_on_writes_finished || + (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE && + (s->included[GRPC_CHTTP2_LIST_WRITABLE] || + s->included[GRPC_CHTTP2_LIST_WRITING]))) { + /* If a write is in progress, and it involves this stream, wait for the + * write to complete before cancelling things out. If we don't do this, then + * our combiner lock might think that some operation on its queue might be + * covering a completion even though there is none, in which case we might + * offload to another thread, which isn't guarateed to exist */ + if (error != GRPC_ERROR_NONE) { + if (s->fail_pending_writes_on_writes_finished_error == GRPC_ERROR_NONE) { + s->fail_pending_writes_on_writes_finished_error = GRPC_ERROR_CREATE( + "Post-poned fail writes due to in-progress write"); + } + s->fail_pending_writes_on_writes_finished_error = grpc_error_add_child( + s->fail_pending_writes_on_writes_finished_error, error); + } + s->need_fail_pending_writes_on_writes_finished = true; + return; /* early out */ + } + error = removal_error(error, s, "Pending writes failed due to stream closure"); s->send_initial_metadata = NULL; @@ -1590,7 +1625,7 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, if (close_writes && !s->write_closed) { s->write_closed_error = GRPC_ERROR_REF(error); s->write_closed = true; - fail_pending_writes(exec_ctx, t, s, GRPC_ERROR_REF(error)); + grpc_chttp2_fail_pending_writes(exec_ctx, t, s, GRPC_ERROR_REF(error)); grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); } if (s->read_closed && s->write_closed) { diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index b74233d992..6cba1e7fd2 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -409,6 +409,9 @@ struct grpc_chttp2_stream { grpc_error *read_closed_error; /** the error that resulted in this stream being write-closed */ grpc_error *write_closed_error; + /** should any writes be cleared once this stream becomes non-writable */ + bool need_fail_pending_writes_on_writes_finished; + grpc_error *fail_pending_writes_on_writes_finished_error; grpc_published_metadata_method published_metadata[2]; bool final_metadata_requested; @@ -689,4 +692,11 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s); +void grpc_chttp2_leave_writing_lists(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s); +void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, grpc_error *error); + #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */ diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 139e7387c4..769b229a0d 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -208,6 +208,7 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:already_writing"); } } else { + grpc_chttp2_leave_writing_lists(exec_ctx, t, s); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:no_write"); } } @@ -252,6 +253,7 @@ void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_mark_stream_closed(exec_ctx, t, s, !t->is_client, 1, GRPC_ERROR_REF(error)); } + grpc_chttp2_leave_writing_lists(exec_ctx, t, s); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:end"); } grpc_slice_buffer_reset_and_unref(&t->outbuf); |