aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2016-12-05 07:03:49 -0800
committerGravatar Mark D. Roth <roth@google.com>2016-12-05 07:03:49 -0800
commit16eaa454154122a5db61cd8058794af6c5fe9077 (patch)
treebccded87d75d5fdadf2a01a6498f0b7c0cf92650 /src/core
parent977f5d4e7dc8bdf21bda2e8b9a7a232f88de8e73 (diff)
parent8e9a492ad910ad66751fc1d0015e4ef051b62ddc (diff)
Merge remote-tracking branch 'upstream/master' into client_channel_init_cleanup
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/client_channel/http_connect_handshaker.c182
-rw-r--r--src/core/ext/client_channel/subchannel.c18
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c18
-rw-r--r--src/core/ext/resolver/dns/native/dns_resolver.c16
-rw-r--r--src/core/ext/transport/chttp2/client/chttp2_connector.c261
-rw-r--r--src/core/ext/transport/chttp2/client/chttp2_connector.h52
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c145
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c210
-rw-r--r--src/core/ext/transport/chttp2/server/chttp2_server.c356
-rw-r--r--src/core/ext/transport/chttp2/server/chttp2_server.h79
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2.c172
-rw-r--r--src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c338
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c53
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h10
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c2
-rw-r--r--src/core/lib/channel/handshaker.c211
-rw-r--r--src/core/lib/channel/handshaker.h83
-rw-r--r--src/core/lib/http/httpcli_security_connector.c90
-rw-r--r--src/core/lib/iomgr/combiner.c11
-rw-r--r--src/core/lib/iomgr/resource_quota.c12
-rw-r--r--src/core/lib/iomgr/udp_server.c6
-rw-r--r--src/core/lib/security/transport/handshake.c374
-rw-r--r--src/core/lib/security/transport/security_connector.c242
-rw-r--r--src/core/lib/security/transport/security_connector.h58
-rw-r--r--src/core/lib/security/transport/security_handshaker.c450
-rw-r--r--src/core/lib/security/transport/security_handshaker.h (renamed from src/core/lib/security/transport/handshake.h)21
-rw-r--r--src/core/lib/support/backoff.c37
-rw-r--r--src/core/lib/support/backoff.h7
28 files changed, 1873 insertions, 1641 deletions
diff --git a/src/core/ext/client_channel/http_connect_handshaker.c b/src/core/ext/client_channel/http_connect_handshaker.c
index 82042897b2..572af52dfd 100644
--- a/src/core/ext/client_channel/http_connect_handshaker.c
+++ b/src/core/ext/client_channel/http_connect_handshaker.c
@@ -41,9 +41,9 @@
#include <grpc/support/string_util.h>
#include "src/core/ext/client_channel/uri_parser.h"
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/http/format_request.h"
#include "src/core/lib/http/parser.h"
-#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/env.h"
typedef struct http_connect_handshaker {
@@ -53,27 +53,38 @@ typedef struct http_connect_handshaker {
char* proxy_server;
char* server_name;
+ gpr_refcount refcount;
+ gpr_mu mu;
+
+ bool shutdown;
+ // Endpoint and read buffer to destroy after a shutdown.
+ grpc_endpoint* endpoint_to_destroy;
+ grpc_slice_buffer* read_buffer_to_destroy;
+
// State saved while performing the handshake.
- grpc_endpoint* endpoint;
- grpc_channel_args* args;
- grpc_handshaker_done_cb cb;
- void* user_data;
+ grpc_handshaker_args* args;
+ grpc_closure* on_handshake_done;
// Objects for processing the HTTP CONNECT request and response.
grpc_slice_buffer write_buffer;
- grpc_slice_buffer* read_buffer; // Ownership passes through this object.
grpc_closure request_done_closure;
grpc_closure response_read_closure;
grpc_http_parser http_parser;
grpc_http_response http_response;
- grpc_timer timeout_timer;
-
- gpr_refcount refcount;
} http_connect_handshaker;
// Unref and clean up handshaker.
-static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) {
+static void http_connect_handshaker_unref(grpc_exec_ctx* exec_ctx,
+ http_connect_handshaker* handshaker) {
if (gpr_unref(&handshaker->refcount)) {
+ gpr_mu_destroy(&handshaker->mu);
+ if (handshaker->endpoint_to_destroy != NULL) {
+ grpc_endpoint_destroy(exec_ctx, handshaker->endpoint_to_destroy);
+ }
+ if (handshaker->read_buffer_to_destroy != NULL) {
+ grpc_slice_buffer_destroy(handshaker->read_buffer_to_destroy);
+ gpr_free(handshaker->read_buffer_to_destroy);
+ }
gpr_free(handshaker->proxy_server);
gpr_free(handshaker->server_name);
grpc_slice_buffer_destroy(&handshaker->write_buffer);
@@ -83,28 +94,64 @@ static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) {
}
}
-// Callback invoked when deadline is exceeded.
-static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
- http_connect_handshaker* handshaker = arg;
- if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled.
- grpc_endpoint_shutdown(exec_ctx, handshaker->endpoint);
+// Set args fields to NULL, saving the endpoint and read buffer for
+// later destruction.
+static void cleanup_args_for_failure_locked(
+ http_connect_handshaker* handshaker) {
+ handshaker->endpoint_to_destroy = handshaker->args->endpoint;
+ handshaker->args->endpoint = NULL;
+ handshaker->read_buffer_to_destroy = handshaker->args->read_buffer;
+ handshaker->args->read_buffer = NULL;
+ grpc_channel_args_destroy(handshaker->args->args);
+ handshaker->args->args = NULL;
+}
+
+// If the handshake failed or we're shutting down, clean up and invoke the
+// callback with the error.
+static void handshake_failed_locked(grpc_exec_ctx* exec_ctx,
+ http_connect_handshaker* handshaker,
+ grpc_error* error) {
+ if (error == GRPC_ERROR_NONE) {
+ // If we were shut down after an endpoint operation succeeded but
+ // before the endpoint callback was invoked, we need to generate our
+ // own error.
+ error = GRPC_ERROR_CREATE("Handshaker shutdown");
}
- http_connect_handshaker_unref(handshaker);
+ if (!handshaker->shutdown) {
+ // TODO(ctiller): It is currently necessary to shutdown endpoints
+ // before destroying them, even if we know that there are no
+ // pending read/write callbacks. This should be fixed, at which
+ // point this can be removed.
+ grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint);
+ // Not shutting down, so the handshake failed. Clean up before
+ // invoking the callback.
+ cleanup_args_for_failure_locked(handshaker);
+ // Set shutdown to true so that subsequent calls to
+ // http_connect_handshaker_shutdown() do nothing.
+ handshaker->shutdown = true;
+ }
+ // Invoke callback.
+ grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL);
}
// Callback invoked when finished writing HTTP CONNECT request.
static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
http_connect_handshaker* handshaker = arg;
- if (error != GRPC_ERROR_NONE) {
- // If the write failed, invoke the callback immediately with the error.
- handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args,
- handshaker->read_buffer, handshaker->user_data,
- GRPC_ERROR_REF(error));
+ gpr_mu_lock(&handshaker->mu);
+ if (error != GRPC_ERROR_NONE || handshaker->shutdown) {
+ // If the write failed or we're shutting down, clean up and invoke the
+ // callback with the error.
+ handshake_failed_locked(exec_ctx, handshaker, GRPC_ERROR_REF(error));
+ gpr_mu_unlock(&handshaker->mu);
+ http_connect_handshaker_unref(exec_ctx, handshaker);
} else {
// Otherwise, read the response.
- grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer,
+ // The read callback inherits our ref to the handshaker.
+ grpc_endpoint_read(exec_ctx, handshaker->args->endpoint,
+ handshaker->args->read_buffer,
&handshaker->response_read_closure);
+ gpr_mu_unlock(&handshaker->mu);
}
}
@@ -112,36 +159,40 @@ static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg,
static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
http_connect_handshaker* handshaker = arg;
- if (error != GRPC_ERROR_NONE) {
- GRPC_ERROR_REF(error); // Take ref to pass to the handshake-done callback.
+ gpr_mu_lock(&handshaker->mu);
+ if (error != GRPC_ERROR_NONE || handshaker->shutdown) {
+ // If the read failed or we're shutting down, clean up and invoke the
+ // callback with the error.
+ handshake_failed_locked(exec_ctx, handshaker, GRPC_ERROR_REF(error));
goto done;
}
// Add buffer to parser.
- for (size_t i = 0; i < handshaker->read_buffer->count; ++i) {
- if (GRPC_SLICE_LENGTH(handshaker->read_buffer->slices[i]) > 0) {
+ for (size_t i = 0; i < handshaker->args->read_buffer->count; ++i) {
+ if (GRPC_SLICE_LENGTH(handshaker->args->read_buffer->slices[i]) > 0) {
size_t body_start_offset = 0;
error = grpc_http_parser_parse(&handshaker->http_parser,
- handshaker->read_buffer->slices[i],
+ handshaker->args->read_buffer->slices[i],
&body_start_offset);
- if (error != GRPC_ERROR_NONE) goto done;
+ if (error != GRPC_ERROR_NONE) {
+ handshake_failed_locked(exec_ctx, handshaker, error);
+ goto done;
+ }
if (handshaker->http_parser.state == GRPC_HTTP_BODY) {
- // We've gotten back a successul response, so stop the timeout timer.
- grpc_timer_cancel(exec_ctx, &handshaker->timeout_timer);
// Remove the data we've already read from the read buffer,
// leaving only the leftover bytes (if any).
grpc_slice_buffer tmp_buffer;
grpc_slice_buffer_init(&tmp_buffer);
if (body_start_offset <
- GRPC_SLICE_LENGTH(handshaker->read_buffer->slices[i])) {
+ GRPC_SLICE_LENGTH(handshaker->args->read_buffer->slices[i])) {
grpc_slice_buffer_add(
&tmp_buffer,
- grpc_slice_split_tail(&handshaker->read_buffer->slices[i],
+ grpc_slice_split_tail(&handshaker->args->read_buffer->slices[i],
body_start_offset));
}
grpc_slice_buffer_addn(&tmp_buffer,
- &handshaker->read_buffer->slices[i + 1],
- handshaker->read_buffer->count - i - 1);
- grpc_slice_buffer_swap(handshaker->read_buffer, &tmp_buffer);
+ &handshaker->args->read_buffer->slices[i + 1],
+ handshaker->args->read_buffer->count - i - 1);
+ grpc_slice_buffer_swap(handshaker->args->read_buffer, &tmp_buffer);
grpc_slice_buffer_destroy(&tmp_buffer);
break;
}
@@ -159,9 +210,11 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
// complete (e.g., handling chunked transfer encoding or looking
// at the Content-Length: header).
if (handshaker->http_parser.state != GRPC_HTTP_BODY) {
- grpc_slice_buffer_reset_and_unref(handshaker->read_buffer);
- grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer,
+ grpc_slice_buffer_reset_and_unref(handshaker->args->read_buffer);
+ grpc_endpoint_read(exec_ctx, handshaker->args->endpoint,
+ handshaker->args->read_buffer,
&handshaker->response_read_closure);
+ gpr_mu_unlock(&handshaker->mu);
return;
}
// Make sure we got a 2xx response.
@@ -172,11 +225,17 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
handshaker->http_response.status);
error = GRPC_ERROR_CREATE(msg);
gpr_free(msg);
+ handshake_failed_locked(exec_ctx, handshaker, error);
+ goto done;
}
+ // Success. Invoke handshake-done callback.
+ grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL);
done:
- // Invoke handshake-done callback.
- handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args,
- handshaker->read_buffer, handshaker->user_data, error);
+ // Set shutdown to true so that subsequent calls to
+ // http_connect_handshaker_shutdown() do nothing.
+ handshaker->shutdown = true;
+ gpr_mu_unlock(&handshaker->mu);
+ http_connect_handshaker_unref(exec_ctx, handshaker);
}
//
@@ -186,25 +245,30 @@ done:
static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker_in) {
http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
- http_connect_handshaker_unref(handshaker);
+ http_connect_handshaker_unref(exec_ctx, handshaker);
}
static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
- grpc_handshaker* handshaker) {}
+ grpc_handshaker* handshaker_in) {
+ http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
+ gpr_mu_lock(&handshaker->mu);
+ if (!handshaker->shutdown) {
+ handshaker->shutdown = true;
+ grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint);
+ cleanup_args_for_failure_locked(handshaker);
+ }
+ gpr_mu_unlock(&handshaker->mu);
+}
static void http_connect_handshaker_do_handshake(
grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker_in,
- grpc_endpoint* endpoint, grpc_channel_args* args,
- grpc_slice_buffer* read_buffer, gpr_timespec deadline,
- grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb,
- void* user_data) {
+ grpc_tcp_server_acceptor* acceptor, grpc_closure* on_handshake_done,
+ grpc_handshaker_args* args) {
http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
+ gpr_mu_lock(&handshaker->mu);
// Save state in the handshaker object.
- handshaker->endpoint = endpoint;
handshaker->args = args;
- handshaker->cb = cb;
- handshaker->user_data = user_data;
- handshaker->read_buffer = read_buffer;
+ handshaker->on_handshake_done = on_handshake_done;
// Send HTTP CONNECT request.
gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s",
handshaker->server_name, handshaker->proxy_server);
@@ -216,16 +280,14 @@ static void http_connect_handshaker_do_handshake(
request.handshaker = &grpc_httpcli_plaintext;
grpc_slice request_slice = grpc_httpcli_format_connect_request(&request);
grpc_slice_buffer_add(&handshaker->write_buffer, request_slice);
- grpc_endpoint_write(exec_ctx, endpoint, &handshaker->write_buffer,
- &handshaker->request_done_closure);
- // Set timeout timer. The timer gets a reference to the handshaker.
+ // Take a new ref to be held by the write callback.
gpr_ref(&handshaker->refcount);
- grpc_timer_init(exec_ctx, &handshaker->timeout_timer,
- gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
- on_timeout, handshaker, gpr_now(GPR_CLOCK_MONOTONIC));
+ grpc_endpoint_write(exec_ctx, args->endpoint, &handshaker->write_buffer,
+ &handshaker->request_done_closure);
+ gpr_mu_unlock(&handshaker->mu);
}
-static const struct grpc_handshaker_vtable http_connect_handshaker_vtable = {
+static const grpc_handshaker_vtable http_connect_handshaker_vtable = {
http_connect_handshaker_destroy, http_connect_handshaker_shutdown,
http_connect_handshaker_do_handshake};
@@ -233,10 +295,11 @@ grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server,
const char* server_name) {
GPR_ASSERT(proxy_server != NULL);
GPR_ASSERT(server_name != NULL);
- http_connect_handshaker* handshaker =
- gpr_malloc(sizeof(http_connect_handshaker));
+ http_connect_handshaker* handshaker = gpr_malloc(sizeof(*handshaker));
memset(handshaker, 0, sizeof(*handshaker));
grpc_handshaker_init(&http_connect_handshaker_vtable, &handshaker->base);
+ gpr_mu_init(&handshaker->mu);
+ gpr_ref_init(&handshaker->refcount, 1);
handshaker->proxy_server = gpr_strdup(proxy_server);
handshaker->server_name = gpr_strdup(server_name);
grpc_slice_buffer_init(&handshaker->write_buffer);
@@ -246,7 +309,6 @@ grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server,
handshaker);
grpc_http_parser_init(&handshaker->http_parser, GRPC_HTTP_RESPONSE,
&handshaker->http_response);
- gpr_ref_init(&handshaker->refcount, 1);
return &handshaker->base;
}
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index 2ab07db30c..f294e69392 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -336,16 +336,18 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
int initial_backoff_ms =
GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
+ int min_backoff_ms = GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS * 1000;
bool fixed_reconnect_backoff = false;
if (c->args) {
for (size_t i = 0; i < c->args->num_args; i++) {
if (0 == strcmp(c->args->args[i].key,
- "grpc.testing.fixed_reconnect_backoff")) {
+ "grpc.testing.fixed_reconnect_backoff_ms")) {
GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER);
fixed_reconnect_backoff = true;
- initial_backoff_ms = max_backoff_ms = grpc_channel_arg_get_integer(
- &c->args->args[i],
- (grpc_integer_options){initial_backoff_ms, 100, INT_MAX});
+ initial_backoff_ms = min_backoff_ms = max_backoff_ms =
+ grpc_channel_arg_get_integer(
+ &c->args->args[i],
+ (grpc_integer_options){initial_backoff_ms, 100, INT_MAX});
} else if (0 == strcmp(c->args->args[i].key,
GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
fixed_reconnect_backoff = false;
@@ -362,11 +364,11 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
}
}
gpr_backoff_init(
- &c->backoff_state,
+ &c->backoff_state, initial_backoff_ms,
fixed_reconnect_backoff ? 1.0
: GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER,
fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER,
- initial_backoff_ms, max_backoff_ms);
+ min_backoff_ms, max_backoff_ms);
gpr_mu_init(&c->mu);
return grpc_subchannel_index_register(exec_ctx, key, c);
@@ -643,9 +645,7 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con));
/* setup subchannel watching connected subchannel for changes; subchannel
- ref
- for connecting is donated
- to the state watcher */
+ ref for connecting is donated to the state watcher */
GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
grpc_connected_subchannel_notify_on_state_change(
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index 4262d2b9a4..df0db61c22 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -123,10 +123,11 @@
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/static_metadata.h"
-#define BACKOFF_MULTIPLIER 1.6
-#define BACKOFF_JITTER 0.2
-#define BACKOFF_MIN_SECONDS 10
-#define BACKOFF_MAX_SECONDS 60
+#define GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS 20
+#define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
+#define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
+#define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
+#define GRPC_GRPCLB_RECONNECT_JITTER 0.2
int grpc_lb_glb_trace = 0;
@@ -1107,9 +1108,12 @@ static void lb_call_init_locked(glb_lb_policy *glb_policy) {
grpc_closure_init(&glb_policy->lb_on_response_received,
lb_on_response_received, glb_policy);
- gpr_backoff_init(&glb_policy->lb_call_backoff_state, BACKOFF_MULTIPLIER,
- BACKOFF_JITTER, BACKOFF_MIN_SECONDS * 1000,
- BACKOFF_MAX_SECONDS * 1000);
+ gpr_backoff_init(&glb_policy->lb_call_backoff_state,
+ GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS,
+ GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER,
+ GRPC_GRPCLB_RECONNECT_JITTER,
+ GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
+ GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
}
static void lb_call_destroy_locked(glb_lb_policy *glb_policy) {
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c
index 665439f360..15476f5792 100644
--- a/src/core/ext/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/resolver/dns/native/dns_resolver.c
@@ -46,10 +46,11 @@
#include "src/core/lib/support/backoff.h"
#include "src/core/lib/support/string.h"
-#define BACKOFF_MULTIPLIER 1.6
-#define BACKOFF_JITTER 0.2
-#define BACKOFF_MIN_SECONDS 1
-#define BACKOFF_MAX_SECONDS 120
+#define GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS 1
+#define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1
+#define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6
+#define GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS 120
+#define GRPC_DNS_RECONNECT_JITTER 0.2
typedef struct {
/** base class: must be first */
@@ -269,8 +270,11 @@ static grpc_resolver *dns_create(grpc_resolver_args *args,
server_name_arg.value.string = (char *)path;
r->channel_args =
grpc_channel_args_copy_and_add(args->args, &server_name_arg, 1);
- gpr_backoff_init(&r->backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER,
- BACKOFF_MIN_SECONDS * 1000, BACKOFF_MAX_SECONDS * 1000);
+ gpr_backoff_init(&r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS,
+ GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER,
+ GRPC_DNS_RECONNECT_JITTER,
+ GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
+ GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
return &r->base;
}
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c
new file mode 100644
index 0000000000..213395c20f
--- /dev/null
+++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c
@@ -0,0 +1,261 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
+
+#include <grpc/grpc.h>
+
+#include <string.h>
+
+#include <grpc/slice_buffer.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/ext/client_channel/connector.h"
+#include "src/core/ext/client_channel/http_connect_handshaker.h"
+#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/handshaker.h"
+#include "src/core/lib/iomgr/tcp_client.h"
+#include "src/core/lib/security/transport/security_connector.h"
+
+typedef struct {
+ grpc_connector base;
+
+ gpr_mu mu;
+ gpr_refcount refs;
+
+ bool shutdown;
+
+ char *server_name;
+ grpc_chttp2_create_handshakers_func create_handshakers;
+ void *create_handshakers_user_data;
+
+ grpc_closure *notify;
+ grpc_connect_in_args args;
+ grpc_connect_out_args *result;
+ grpc_closure initial_string_sent;
+ grpc_slice_buffer initial_string_buffer;
+
+ grpc_endpoint *endpoint; // Non-NULL until handshaking starts.
+
+ grpc_closure connected;
+
+ grpc_handshake_manager *handshake_mgr;
+} chttp2_connector;
+
+static void chttp2_connector_ref(grpc_connector *con) {
+ chttp2_connector *c = (chttp2_connector *)con;
+ gpr_ref(&c->refs);
+}
+
+static void chttp2_connector_unref(grpc_exec_ctx *exec_ctx,
+ grpc_connector *con) {
+ chttp2_connector *c = (chttp2_connector *)con;
+ if (gpr_unref(&c->refs)) {
+ /* c->initial_string_buffer does not need to be destroyed */
+ gpr_mu_destroy(&c->mu);
+ // If handshaking is not yet in progress, destroy the endpoint.
+ // Otherwise, the handshaker will do this for us.
+ if (c->endpoint != NULL) grpc_endpoint_destroy(exec_ctx, c->endpoint);
+ gpr_free(c->server_name);
+ gpr_free(c);
+ }
+}
+
+static void chttp2_connector_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_connector *con) {
+ chttp2_connector *c = (chttp2_connector *)con;
+ gpr_mu_lock(&c->mu);
+ c->shutdown = true;
+ if (c->handshake_mgr != NULL) {
+ grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr);
+ }
+ // If handshaking is not yet in progress, shutdown the endpoint.
+ // Otherwise, the handshaker will do this for us.
+ if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint);
+ gpr_mu_unlock(&c->mu);
+}
+
+static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_handshaker_args *args = arg;
+ chttp2_connector *c = args->user_data;
+ gpr_mu_lock(&c->mu);
+ if (error != GRPC_ERROR_NONE || c->shutdown) {
+ if (error == GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_CREATE("connector shutdown");
+ // We were shut down after handshaking completed successfully, so
+ // destroy the endpoint here.
+ // TODO(ctiller): It is currently necessary to shutdown endpoints
+ // before destroying them, even if we know that there are no
+ // pending read/write callbacks. This should be fixed, at which
+ // point this can be removed.
+ grpc_endpoint_shutdown(exec_ctx, args->endpoint);
+ grpc_endpoint_destroy(exec_ctx, args->endpoint);
+ grpc_channel_args_destroy(args->args);
+ grpc_slice_buffer_destroy(args->read_buffer);
+ gpr_free(args->read_buffer);
+ } else {
+ error = GRPC_ERROR_REF(error);
+ }
+ memset(c->result, 0, sizeof(*c->result));
+ } else {
+ c->result->transport =
+ grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 1);
+ GPR_ASSERT(c->result->transport);
+ grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport,
+ args->read_buffer);
+ c->result->channel_args = args->args;
+ }
+ grpc_closure *notify = c->notify;
+ c->notify = NULL;
+ grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
+ grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr);
+ c->handshake_mgr = NULL;
+ gpr_mu_unlock(&c->mu);
+ chttp2_connector_unref(exec_ctx, (grpc_connector *)c);
+}
+
+static void start_handshake_locked(grpc_exec_ctx *exec_ctx,
+ chttp2_connector *c) {
+ c->handshake_mgr = grpc_handshake_manager_create();
+ char *proxy_name = grpc_get_http_proxy_server();
+ if (proxy_name != NULL) {
+ grpc_handshake_manager_add(
+ c->handshake_mgr,
+ grpc_http_connect_handshaker_create(proxy_name, c->server_name));
+ gpr_free(proxy_name);
+ }
+ if (c->create_handshakers != NULL) {
+ c->create_handshakers(exec_ctx, c->create_handshakers_user_data,
+ c->handshake_mgr);
+ }
+ grpc_handshake_manager_do_handshake(
+ exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args,
+ c->args.deadline, NULL /* acceptor */, on_handshake_done, c);
+ c->endpoint = NULL; // Endpoint handed off to handshake manager.
+}
+
+static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ chttp2_connector *c = arg;
+ gpr_mu_lock(&c->mu);
+ if (error != GRPC_ERROR_NONE || c->shutdown) {
+ if (error == GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_CREATE("connector shutdown");
+ } else {
+ error = GRPC_ERROR_REF(error);
+ }
+ memset(c->result, 0, sizeof(*c->result));
+ grpc_closure *notify = c->notify;
+ c->notify = NULL;
+ grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
+ gpr_mu_unlock(&c->mu);
+ chttp2_connector_unref(exec_ctx, arg);
+ } else {
+ start_handshake_locked(exec_ctx, c);
+ gpr_mu_unlock(&c->mu);
+ }
+}
+
+static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ chttp2_connector *c = arg;
+ gpr_mu_lock(&c->mu);
+ if (error != GRPC_ERROR_NONE || c->shutdown) {
+ if (error == GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_CREATE("connector shutdown");
+ } else {
+ error = GRPC_ERROR_REF(error);
+ }
+ memset(c->result, 0, sizeof(*c->result));
+ grpc_closure *notify = c->notify;
+ c->notify = NULL;
+ grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
+ gpr_mu_unlock(&c->mu);
+ chttp2_connector_unref(exec_ctx, arg);
+ } else {
+ GPR_ASSERT(c->endpoint != NULL);
+ if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) {
+ grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent,
+ c);
+ grpc_slice_buffer_init(&c->initial_string_buffer);
+ grpc_slice_buffer_add(&c->initial_string_buffer,
+ c->args.initial_connect_string);
+ grpc_endpoint_write(exec_ctx, c->endpoint, &c->initial_string_buffer,
+ &c->initial_string_sent);
+ } else {
+ start_handshake_locked(exec_ctx, c);
+ }
+ gpr_mu_unlock(&c->mu);
+ }
+}
+
+static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx,
+ grpc_connector *con,
+ const grpc_connect_in_args *args,
+ grpc_connect_out_args *result,
+ grpc_closure *notify) {
+ chttp2_connector *c = (chttp2_connector *)con;
+ gpr_mu_lock(&c->mu);
+ GPR_ASSERT(c->notify == NULL);
+ c->notify = notify;
+ c->args = *args;
+ c->result = result;
+ GPR_ASSERT(c->endpoint == NULL);
+ chttp2_connector_ref(con); // Ref taken for callback.
+ grpc_closure_init(&c->connected, connected, c);
+ grpc_tcp_client_connect(exec_ctx, &c->connected, &c->endpoint,
+ args->interested_parties, args->channel_args,
+ args->addr, args->deadline);
+ gpr_mu_unlock(&c->mu);
+}
+
+static const grpc_connector_vtable chttp2_connector_vtable = {
+ chttp2_connector_ref, chttp2_connector_unref, chttp2_connector_shutdown,
+ chttp2_connector_connect};
+
+grpc_connector *grpc_chttp2_connector_create(
+ grpc_exec_ctx *exec_ctx, const char *server_name,
+ grpc_chttp2_create_handshakers_func create_handshakers,
+ void *create_handshakers_user_data) {
+ chttp2_connector *c = gpr_malloc(sizeof(*c));
+ memset(c, 0, sizeof(*c));
+ c->base.vtable = &chttp2_connector_vtable;
+ gpr_mu_init(&c->mu);
+ gpr_ref_init(&c->refs, 1);
+ c->server_name = gpr_strdup(server_name);
+ c->create_handshakers = create_handshakers;
+ c->create_handshakers_user_data = create_handshakers_user_data;
+ return &c->base;
+}
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.h b/src/core/ext/transport/chttp2/client/chttp2_connector.h
new file mode 100644
index 0000000000..6c34ce1af1
--- /dev/null
+++ b/src/core/ext/transport/chttp2/client/chttp2_connector.h
@@ -0,0 +1,52 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H
+#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H
+
+#include "src/core/ext/client_channel/connector.h"
+#include "src/core/lib/channel/handshaker.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+
+typedef void (*grpc_chttp2_create_handshakers_func)(
+ grpc_exec_ctx* exec_ctx, void* user_data,
+ grpc_handshake_manager* handshake_mgr);
+
+/// If \a create_handshakers is non-NULL, it will be called with
+/// \a create_handshakers_user_data to add handshakers.
+grpc_connector* grpc_chttp2_connector_create(
+ grpc_exec_ctx* exec_ctx, const char* server_name,
+ grpc_chttp2_create_handshakers_func create_handshakers,
+ void* create_handshakers_user_data);
+
+#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H */
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index 9e0478feab..7325f9413d 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -33,137 +33,17 @@
#include <grpc/grpc.h>
-#include <stdlib.h>
#include <string.h>
-#include <grpc/slice.h>
-#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
#include "src/core/ext/client_channel/client_channel.h"
-#include "src/core/ext/client_channel/http_connect_handshaker.h"
-#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
+#include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/channel/compress_filter.h"
-#include "src/core/lib/channel/handshaker.h"
-#include "src/core/lib/channel/http_client_filter.h"
-#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h"
-//
-// connector
-//
-
-typedef struct {
- grpc_connector base;
- gpr_refcount refs;
-
- grpc_closure *notify;
- grpc_connect_in_args args;
- grpc_connect_out_args *result;
- grpc_closure initial_string_sent;
- grpc_slice_buffer initial_string_buffer;
-
- grpc_endpoint *tcp;
-
- grpc_closure connected;
-
- grpc_handshake_manager *handshake_mgr;
-} connector;
-
-static void connector_ref(grpc_connector *con) {
- connector *c = (connector *)con;
- gpr_ref(&c->refs);
-}
-
-static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
- connector *c = (connector *)con;
- if (gpr_unref(&c->refs)) {
- /* c->initial_string_buffer does not need to be destroyed */
- grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr);
- gpr_free(c);
- }
-}
-
-static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- connector_unref(exec_ctx, arg);
-}
-
-static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
- grpc_channel_args *args,
- grpc_slice_buffer *read_buffer, void *user_data,
- grpc_error *error) {
- connector *c = user_data;
- if (error != GRPC_ERROR_NONE) {
- grpc_channel_args_destroy(args);
- gpr_free(read_buffer);
- } else {
- c->result->transport =
- grpc_create_chttp2_transport(exec_ctx, args, endpoint, 1);
- GPR_ASSERT(c->result->transport);
- grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport,
- read_buffer);
- c->result->channel_args = args;
- }
- grpc_closure *notify = c->notify;
- c->notify = NULL;
- grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
-}
-
-static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- connector *c = arg;
- grpc_endpoint *tcp = c->tcp;
- if (tcp != NULL) {
- if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) {
- grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent,
- c);
- grpc_slice_buffer_init(&c->initial_string_buffer);
- grpc_slice_buffer_add(&c->initial_string_buffer,
- c->args.initial_connect_string);
- connector_ref(arg);
- grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer,
- &c->initial_string_sent);
- } else {
- grpc_handshake_manager_do_handshake(
- exec_ctx, c->handshake_mgr, tcp, c->args.channel_args,
- c->args.deadline, NULL /* acceptor */, on_handshake_done, c);
- }
- } else {
- memset(c->result, 0, sizeof(*c->result));
- grpc_closure *notify = c->notify;
- c->notify = NULL;
- grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL);
- }
-}
-
-static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) {}
-
-static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con,
- const grpc_connect_in_args *args,
- grpc_connect_out_args *result,
- grpc_closure *notify) {
- connector *c = (connector *)con;
- GPR_ASSERT(c->notify == NULL);
- GPR_ASSERT(notify->cb);
- c->notify = notify;
- c->args = *args;
- c->result = result;
- c->tcp = NULL;
- grpc_closure_init(&c->connected, connected, c);
- grpc_tcp_client_connect(exec_ctx, &c->connected, &c->tcp,
- args->interested_parties, args->channel_args,
- args->addr, args->deadline);
-}
-
-static const grpc_connector_vtable connector_vtable = {
- connector_ref, connector_unref, connector_shutdown, connector_connect};
-
-//
-// client_channel_factory
-//
-
static void client_channel_factory_ref(
grpc_client_channel_factory *cc_factory) {}
@@ -173,20 +53,11 @@ static void client_channel_factory_unref(
static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const grpc_subchannel_args *args) {
- connector *c = gpr_malloc(sizeof(*c));
- memset(c, 0, sizeof(*c));
- c->base.vtable = &connector_vtable;
- gpr_ref_init(&c->refs, 1);
- c->handshake_mgr = grpc_handshake_manager_create();
- char *proxy_name = grpc_get_http_proxy_server();
- if (proxy_name != NULL) {
- grpc_handshake_manager_add(
- c->handshake_mgr,
- grpc_http_connect_handshaker_create(proxy_name, args->server_name));
- gpr_free(proxy_name);
- }
- grpc_subchannel *s = grpc_subchannel_create(exec_ctx, &c->base, args);
- grpc_connector_unref(exec_ctx, &c->base);
+ grpc_connector *connector = grpc_chttp2_connector_create(
+ exec_ctx, args->server_name, NULL /* create_handshakers */,
+ NULL /* user_data */);
+ grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args);
+ grpc_connector_unref(exec_ctx, connector);
return s;
}
@@ -229,7 +100,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
GRPC_API_TRACE(
"grpc_insecure_channel_create(target=%p, args=%p, reserved=%p)", 3,
(target, args, reserved));
- GPR_ASSERT(!reserved);
+ GPR_ASSERT(reserved == NULL);
grpc_client_channel_factory *factory =
(grpc_client_channel_factory *)&client_channel_factory;
// Add channel args containing the server name and client channel factory.
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index be57f30bd0..63f2488088 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -33,195 +33,18 @@
#include <grpc/grpc.h>
-#include <stdlib.h>
#include <string.h>
-#include <grpc/slice.h>
-#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
#include "src/core/ext/client_channel/client_channel.h"
-#include "src/core/ext/client_channel/http_connect_handshaker.h"
-#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
+#include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/channel/handshaker.h"
-#include "src/core/lib/iomgr/tcp_client.h"
-#include "src/core/lib/security/context/security_context.h"
#include "src/core/lib/security/credentials/credentials.h"
-#include "src/core/lib/security/transport/auth_filters.h"
+#include "src/core/lib/security/transport/security_connector.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h"
-#include "src/core/lib/tsi/transport_security_interface.h"
-
-//
-// connector
-//
-
-typedef struct {
- grpc_connector base;
- gpr_refcount refs;
-
- grpc_channel_security_connector *security_connector;
-
- grpc_closure *notify;
- grpc_connect_in_args args;
- grpc_connect_out_args *result;
- grpc_closure initial_string_sent;
- grpc_slice_buffer initial_string_buffer;
-
- gpr_mu mu;
- grpc_endpoint *connecting_endpoint;
- grpc_endpoint *newly_connecting_endpoint;
-
- grpc_closure connected_closure;
-
- grpc_handshake_manager *handshake_mgr;
-
- // TODO(roth): Remove once we eliminate on_secure_handshake_done().
- grpc_channel_args *tmp_args;
-} connector;
-
-static void connector_ref(grpc_connector *con) {
- connector *c = (connector *)con;
- gpr_ref(&c->refs);
-}
-
-static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
- connector *c = (connector *)con;
- if (gpr_unref(&c->refs)) {
- /* c->initial_string_buffer does not need to be destroyed */
- grpc_channel_args_destroy(c->tmp_args);
- grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr);
- gpr_free(c);
- }
-}
-
-static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_security_status status,
- grpc_endpoint *secure_endpoint,
- grpc_auth_context *auth_context) {
- connector *c = arg;
- gpr_mu_lock(&c->mu);
- grpc_error *error = GRPC_ERROR_NONE;
- if (c->connecting_endpoint == NULL) {
- memset(c->result, 0, sizeof(*c->result));
- gpr_mu_unlock(&c->mu);
- } else if (status != GRPC_SECURITY_OK) {
- error = grpc_error_set_int(GRPC_ERROR_CREATE("Secure handshake failed"),
- GRPC_ERROR_INT_SECURITY_STATUS, status);
- memset(c->result, 0, sizeof(*c->result));
- c->connecting_endpoint = NULL;
- gpr_mu_unlock(&c->mu);
- } else {
- grpc_arg auth_context_arg;
- c->connecting_endpoint = NULL;
- gpr_mu_unlock(&c->mu);
- c->result->transport = grpc_create_chttp2_transport(
- exec_ctx, c->args.channel_args, secure_endpoint, 1);
- grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL);
- auth_context_arg = grpc_auth_context_to_arg(auth_context);
- c->result->channel_args =
- grpc_channel_args_copy_and_add(c->tmp_args, &auth_context_arg, 1);
- }
- grpc_closure *notify = c->notify;
- c->notify = NULL;
- grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
-}
-
-static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
- grpc_channel_args *args,
- grpc_slice_buffer *read_buffer, void *user_data,
- grpc_error *error) {
- connector *c = user_data;
- c->tmp_args = args;
- if (error != GRPC_ERROR_NONE) {
- gpr_free(read_buffer);
- grpc_closure *notify = c->notify;
- c->notify = NULL;
- grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
- } else {
- // TODO(roth, jboeuf): Convert security connector handshaking to use new
- // handshake API, and then move the code from on_secure_handshake_done()
- // into this function.
- grpc_channel_security_connector_do_handshake(
- exec_ctx, c->security_connector, endpoint, read_buffer,
- c->args.deadline, on_secure_handshake_done, c);
- }
-}
-
-static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- connector *c = arg;
- grpc_handshake_manager_do_handshake(
- exec_ctx, c->handshake_mgr, c->connecting_endpoint, c->args.channel_args,
- c->args.deadline, NULL /* acceptor */, on_handshake_done, c);
-}
-
-static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- connector *c = arg;
- grpc_endpoint *tcp = c->newly_connecting_endpoint;
- if (tcp != NULL) {
- gpr_mu_lock(&c->mu);
- GPR_ASSERT(c->connecting_endpoint == NULL);
- c->connecting_endpoint = tcp;
- gpr_mu_unlock(&c->mu);
- if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) {
- grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent,
- c);
- grpc_slice_buffer_init(&c->initial_string_buffer);
- grpc_slice_buffer_add(&c->initial_string_buffer,
- c->args.initial_connect_string);
- grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer,
- &c->initial_string_sent);
- } else {
- grpc_handshake_manager_do_handshake(
- exec_ctx, c->handshake_mgr, tcp, c->args.channel_args,
- c->args.deadline, NULL /* acceptor */, on_handshake_done, c);
- }
- } else {
- memset(c->result, 0, sizeof(*c->result));
- grpc_closure *notify = c->notify;
- c->notify = NULL;
- grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL);
- }
-}
-
-static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
- connector *c = (connector *)con;
- grpc_endpoint *ep;
- gpr_mu_lock(&c->mu);
- ep = c->connecting_endpoint;
- c->connecting_endpoint = NULL;
- gpr_mu_unlock(&c->mu);
- if (ep) {
- grpc_endpoint_shutdown(exec_ctx, ep);
- }
-}
-
-static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con,
- const grpc_connect_in_args *args,
- grpc_connect_out_args *result,
- grpc_closure *notify) {
- connector *c = (connector *)con;
- GPR_ASSERT(c->notify == NULL);
- c->notify = notify;
- c->args = *args;
- c->result = result;
- gpr_mu_lock(&c->mu);
- GPR_ASSERT(c->connecting_endpoint == NULL);
- gpr_mu_unlock(&c->mu);
- grpc_closure_init(&c->connected_closure, connected, c);
- grpc_tcp_client_connect(
- exec_ctx, &c->connected_closure, &c->newly_connecting_endpoint,
- args->interested_parties, args->channel_args, args->addr, args->deadline);
-}
-
-static const grpc_connector_vtable connector_vtable = {
- connector_ref, connector_unref, connector_shutdown, connector_connect};
-
-//
-// client_channel_factory
-//
typedef struct {
grpc_client_channel_factory base;
@@ -245,26 +68,21 @@ static void client_channel_factory_unref(
}
}
+static void create_handshakers(grpc_exec_ctx *exec_ctx,
+ void *security_connector,
+ grpc_handshake_manager *handshake_mgr) {
+ grpc_channel_security_connector_create_handshakers(
+ exec_ctx, security_connector, handshake_mgr);
+}
+
static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const grpc_subchannel_args *args) {
client_channel_factory *f = (client_channel_factory *)cc_factory;
- connector *c = gpr_malloc(sizeof(*c));
- memset(c, 0, sizeof(*c));
- c->base.vtable = &connector_vtable;
- c->security_connector = f->security_connector;
- c->handshake_mgr = grpc_handshake_manager_create();
- char *proxy_name = grpc_get_http_proxy_server();
- if (proxy_name != NULL) {
- grpc_handshake_manager_add(
- c->handshake_mgr,
- grpc_http_connect_handshaker_create(proxy_name, args->server_name));
- gpr_free(proxy_name);
- }
- gpr_mu_init(&c->mu);
- gpr_ref_init(&c->refs, 1);
- grpc_subchannel *s = grpc_subchannel_create(exec_ctx, &c->base, args);
- grpc_connector_unref(exec_ctx, &c->base);
+ grpc_connector *connector = grpc_chttp2_connector_create(
+ exec_ctx, args->server_name, create_handshakers, f->security_connector);
+ grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args);
+ grpc_connector_unref(exec_ctx, connector);
return s;
}
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c
new file mode 100644
index 0000000000..7795606e73
--- /dev/null
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.c
@@ -0,0 +1,356 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/transport/chttp2/server/chttp2_server.h"
+
+#include <grpc/grpc.h>
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/handshaker.h"
+#include "src/core/lib/channel/http_server_filter.h"
+#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/tcp_server.h"
+#include "src/core/lib/surface/api_trace.h"
+#include "src/core/lib/surface/server.h"
+
+void grpc_chttp2_server_handshaker_factory_create_handshakers(
+ grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_server_handshaker_factory *handshaker_factory,
+ grpc_handshake_manager *handshake_mgr) {
+ if (handshaker_factory != NULL) {
+ handshaker_factory->vtable->create_handshakers(
+ exec_ctx, handshaker_factory, handshake_mgr);
+ }
+}
+
+void grpc_chttp2_server_handshaker_factory_destroy(
+ grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_server_handshaker_factory *handshaker_factory) {
+ if (handshaker_factory != NULL) {
+ handshaker_factory->vtable->destroy(exec_ctx, handshaker_factory);
+ }
+}
+
+
+typedef struct pending_handshake_manager_node {
+ grpc_handshake_manager *handshake_mgr;
+ struct pending_handshake_manager_node *next;
+} pending_handshake_manager_node;
+
+typedef struct {
+ grpc_server *server;
+ grpc_tcp_server *tcp_server;
+ grpc_channel_args *args;
+ grpc_chttp2_server_handshaker_factory *handshaker_factory;
+ gpr_mu mu;
+ bool shutdown;
+ grpc_closure tcp_server_shutdown_complete;
+ grpc_closure *server_destroy_listener_done;
+ pending_handshake_manager_node *pending_handshake_mgrs;
+} server_state;
+
+typedef struct {
+ server_state *server_state;
+ grpc_pollset *accepting_pollset;
+ grpc_tcp_server_acceptor *acceptor;
+ grpc_handshake_manager *handshake_mgr;
+} server_connection_state;
+
+static void pending_handshake_manager_add_locked(
+ server_state *state, grpc_handshake_manager *handshake_mgr) {
+ pending_handshake_manager_node *node = gpr_malloc(sizeof(*node));
+ node->handshake_mgr = handshake_mgr;
+ node->next = state->pending_handshake_mgrs;
+ state->pending_handshake_mgrs = node;
+}
+
+static void pending_handshake_manager_remove_locked(
+ server_state *state, grpc_handshake_manager *handshake_mgr) {
+ pending_handshake_manager_node **prev_node = &state->pending_handshake_mgrs;
+ for (pending_handshake_manager_node *node = state->pending_handshake_mgrs;
+ node != NULL; node = node->next) {
+ if (node->handshake_mgr == handshake_mgr) {
+ *prev_node = node->next;
+ gpr_free(node);
+ break;
+ }
+ prev_node = &node->next;
+ }
+}
+
+static void pending_handshake_manager_shutdown_locked(grpc_exec_ctx *exec_ctx,
+ server_state *state) {
+ pending_handshake_manager_node *prev_node = NULL;
+ for (pending_handshake_manager_node *node = state->pending_handshake_mgrs;
+ node != NULL; node = node->next) {
+ grpc_handshake_manager_shutdown(exec_ctx, node->handshake_mgr);
+ gpr_free(prev_node);
+ prev_node = node;
+ }
+ gpr_free(prev_node);
+ state->pending_handshake_mgrs = NULL;
+}
+
+static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_handshaker_args *args = arg;
+ server_connection_state *connection_state = args->user_data;
+ gpr_mu_lock(&connection_state->server_state->mu);
+ if (error != GRPC_ERROR_NONE || connection_state->server_state->shutdown) {
+ const char *error_str = grpc_error_string(error);
+ gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str);
+ grpc_error_free_string(error_str);
+ if (error == GRPC_ERROR_NONE) {
+ // We were shut down after handshaking completed successfully, so
+ // destroy the endpoint here.
+ // TODO(ctiller): It is currently necessary to shutdown endpoints
+ // before destroying them, even if we know that there are no
+ // pending read/write callbacks. This should be fixed, at which
+ // point this can be removed.
+ grpc_endpoint_shutdown(exec_ctx, args->endpoint);
+ grpc_endpoint_destroy(exec_ctx, args->endpoint);
+ grpc_channel_args_destroy(args->args);
+ grpc_slice_buffer_destroy(args->read_buffer);
+ gpr_free(args->read_buffer);
+ }
+ } else {
+ grpc_transport *transport =
+ grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 0);
+ grpc_server_setup_transport(
+ exec_ctx, connection_state->server_state->server, transport,
+ connection_state->accepting_pollset, args->args);
+ grpc_chttp2_transport_start_reading(exec_ctx, transport, args->read_buffer);
+ grpc_channel_args_destroy(args->args);
+ }
+ pending_handshake_manager_remove_locked(connection_state->server_state,
+ connection_state->handshake_mgr);
+ gpr_mu_unlock(&connection_state->server_state->mu);
+ grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr);
+ grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server);
+ gpr_free(connection_state);
+}
+
+static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
+ grpc_pollset *accepting_pollset,
+ grpc_tcp_server_acceptor *acceptor) {
+ server_state *state = arg;
+ gpr_mu_lock(&state->mu);
+ if (state->shutdown) {
+ gpr_mu_unlock(&state->mu);
+ grpc_endpoint_destroy(exec_ctx, tcp);
+ return;
+ }
+ grpc_handshake_manager *handshake_mgr = grpc_handshake_manager_create();
+ pending_handshake_manager_add_locked(state, handshake_mgr);
+ gpr_mu_unlock(&state->mu);
+ grpc_tcp_server_ref(state->tcp_server);
+ server_connection_state *connection_state =
+ gpr_malloc(sizeof(*connection_state));
+ connection_state->server_state = state;
+ connection_state->accepting_pollset = accepting_pollset;
+ connection_state->acceptor = acceptor;
+ connection_state->handshake_mgr = handshake_mgr;
+ grpc_chttp2_server_handshaker_factory_create_handshakers(
+ exec_ctx, state->handshaker_factory, connection_state->handshake_mgr);
+ // TODO(roth): We should really get this timeout value from channel
+ // args instead of hard-coding it.
+ const gpr_timespec deadline = gpr_time_add(
+ gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN));
+ grpc_handshake_manager_do_handshake(
+ exec_ctx, connection_state->handshake_mgr, tcp, state->args, deadline,
+ acceptor, on_handshake_done, connection_state);
+}
+
+/* Server callback: start listening on our ports */
+static void server_start_listener(grpc_exec_ctx *exec_ctx, grpc_server *server,
+ void *arg, grpc_pollset **pollsets,
+ size_t pollset_count) {
+ server_state *state = arg;
+ gpr_mu_lock(&state->mu);
+ state->shutdown = false;
+ gpr_mu_unlock(&state->mu);
+ grpc_tcp_server_start(exec_ctx, state->tcp_server, pollsets, pollset_count,
+ on_accept, state);
+}
+
+static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ server_state *state = arg;
+ /* ensure all threads have unlocked */
+ gpr_mu_lock(&state->mu);
+ grpc_closure *destroy_done = state->server_destroy_listener_done;
+ GPR_ASSERT(state->shutdown);
+ pending_handshake_manager_shutdown_locked(exec_ctx, state);
+ gpr_mu_unlock(&state->mu);
+ // Flush queued work before destroying handshaker factory, since that
+ // may do a synchronous unref.
+ grpc_exec_ctx_flush(exec_ctx);
+ grpc_chttp2_server_handshaker_factory_destroy(exec_ctx,
+ state->handshaker_factory);
+ if (destroy_done != NULL) {
+ destroy_done->cb(exec_ctx, destroy_done->cb_arg, GRPC_ERROR_REF(error));
+ grpc_exec_ctx_flush(exec_ctx);
+ }
+ grpc_channel_args_destroy(state->args);
+ gpr_mu_destroy(&state->mu);
+ gpr_free(state);
+}
+
+/* Server callback: destroy the tcp listener (so we don't generate further
+ callbacks) */
+static void server_destroy_listener(grpc_exec_ctx *exec_ctx,
+ grpc_server *server, void *arg,
+ grpc_closure *destroy_done) {
+ server_state *state = arg;
+ gpr_mu_lock(&state->mu);
+ state->shutdown = true;
+ state->server_destroy_listener_done = destroy_done;
+ grpc_tcp_server *tcp_server = state->tcp_server;
+ gpr_mu_unlock(&state->mu);
+ grpc_tcp_server_shutdown_listeners(exec_ctx, tcp_server);
+ grpc_tcp_server_unref(exec_ctx, tcp_server);
+}
+
+grpc_error *grpc_chttp2_server_add_port(
+ grpc_exec_ctx *exec_ctx, grpc_server *server, const char *addr,
+ grpc_channel_args *args,
+ grpc_chttp2_server_handshaker_factory *handshaker_factory, int *port_num) {
+ grpc_resolved_addresses *resolved = NULL;
+ grpc_tcp_server *tcp_server = NULL;
+ size_t i;
+ size_t count = 0;
+ int port_temp;
+ grpc_error *err = GRPC_ERROR_NONE;
+ server_state *state = NULL;
+ grpc_error **errors = NULL;
+
+ *port_num = -1;
+
+ /* resolve address */
+ err = grpc_blocking_resolve_address(addr, "https", &resolved);
+ if (err != GRPC_ERROR_NONE) {
+ goto error;
+ }
+ state = gpr_malloc(sizeof(*state));
+ memset(state, 0, sizeof(*state));
+ grpc_closure_init(&state->tcp_server_shutdown_complete,
+ tcp_server_shutdown_complete, state);
+ err =
+ grpc_tcp_server_create(exec_ctx, &state->tcp_server_shutdown_complete,
+ args, &tcp_server);
+ if (err != GRPC_ERROR_NONE) {
+ goto error;
+ }
+
+ state->server = server;
+ state->tcp_server = tcp_server;
+ state->args = args;
+ state->handshaker_factory = handshaker_factory;
+ state->shutdown = true;
+ gpr_mu_init(&state->mu);
+
+ const size_t naddrs = resolved->naddrs;
+ errors = gpr_malloc(sizeof(*errors) * naddrs);
+ for (i = 0; i < naddrs; i++) {
+ errors[i] =
+ grpc_tcp_server_add_port(tcp_server, &resolved->addrs[i], &port_temp);
+ if (errors[i] == GRPC_ERROR_NONE) {
+ if (*port_num == -1) {
+ *port_num = port_temp;
+ } else {
+ GPR_ASSERT(*port_num == port_temp);
+ }
+ count++;
+ }
+ }
+ if (count == 0) {
+ char *msg;
+ gpr_asprintf(&msg, "No address added out of total %" PRIuPTR " resolved",
+ naddrs);
+ err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs);
+ gpr_free(msg);
+ goto error;
+ } else if (count != naddrs) {
+ char *msg;
+ gpr_asprintf(&msg, "Only %" PRIuPTR
+ " addresses added out of total %" PRIuPTR " resolved",
+ count, naddrs);
+ err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs);
+ gpr_free(msg);
+
+ const char *warning_message = grpc_error_string(err);
+ gpr_log(GPR_INFO, "WARNING: %s", warning_message);
+ grpc_error_free_string(warning_message);
+ /* we managed to bind some addresses: continue */
+ }
+ grpc_resolved_addresses_destroy(resolved);
+
+ /* Register with the server only upon success */
+ grpc_server_add_listener(exec_ctx, server, state, server_start_listener,
+ server_destroy_listener);
+ goto done;
+
+/* Error path: cleanup and return */
+error:
+ GPR_ASSERT(err != GRPC_ERROR_NONE);
+ if (resolved) {
+ grpc_resolved_addresses_destroy(resolved);
+ }
+ if (tcp_server) {
+ grpc_tcp_server_unref(exec_ctx, tcp_server);
+ } else {
+ grpc_channel_args_destroy(args);
+ grpc_chttp2_server_handshaker_factory_destroy(exec_ctx, handshaker_factory);
+ gpr_free(state);
+ }
+ *port_num = 0;
+
+done:
+ if (errors != NULL) {
+ for (i = 0; i < naddrs; i++) {
+ GRPC_ERROR_UNREF(errors[i]);
+ }
+ gpr_free(errors);
+ }
+ return err;
+}
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.h b/src/core/ext/transport/chttp2/server/chttp2_server.h
new file mode 100644
index 0000000000..b1ff04bcbb
--- /dev/null
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.h
@@ -0,0 +1,79 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H
+#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H
+
+#include <grpc/impl/codegen/grpc_types.h>
+
+#include "src/core/lib/channel/handshaker.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+
+/// A server handshaker factory is used to create handshakers for server
+/// connections.
+typedef struct grpc_chttp2_server_handshaker_factory
+ grpc_chttp2_server_handshaker_factory;
+
+typedef struct {
+ void (*create_handshakers)(
+ grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_server_handshaker_factory *handshaker_factory,
+ grpc_handshake_manager *handshake_mgr);
+ void (*destroy)(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_server_handshaker_factory *handshaker_factory);
+} grpc_chttp2_server_handshaker_factory_vtable;
+
+struct grpc_chttp2_server_handshaker_factory {
+ const grpc_chttp2_server_handshaker_factory_vtable *vtable;
+};
+
+void grpc_chttp2_server_handshaker_factory_create_handshakers(
+ grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_server_handshaker_factory *handshaker_factory,
+ grpc_handshake_manager *handshake_mgr);
+
+void grpc_chttp2_server_handshaker_factory_destroy(
+ grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_server_handshaker_factory *handshaker_factory);
+
+/// Adds a port to \a server. Sets \a port_num to the port number.
+/// If \a handshaker_factory is not NULL, it will be used to create
+/// handshakers for the port.
+/// Takes ownership of \a args and \a handshaker_factory.
+grpc_error *grpc_chttp2_server_add_port(
+ grpc_exec_ctx *exec_ctx, grpc_server *server, const char *addr,
+ grpc_channel_args *args,
+ grpc_chttp2_server_handshaker_factory *handshaker_factory,
+ int *port_num);
+
+#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H */
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
index c18d618f96..366312bd72 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
@@ -33,180 +33,28 @@
#include <grpc/grpc.h>
-#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-#include <grpc/support/useful.h>
-#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
+#include "src/core/ext/transport/chttp2/server/chttp2_server.h"
#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/channel/handshaker.h"
-#include "src/core/lib/channel/http_server_filter.h"
-#include "src/core/lib/iomgr/resolve_address.h"
-#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/server.h"
-typedef struct server_connect_state {
- grpc_server *server;
- grpc_pollset *accepting_pollset;
- grpc_tcp_server_acceptor *acceptor;
- grpc_handshake_manager *handshake_mgr;
-} server_connect_state;
-
-static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
- grpc_channel_args *args,
- grpc_slice_buffer *read_buffer, void *user_data,
- grpc_error *error) {
- server_connect_state *state = user_data;
- if (error != GRPC_ERROR_NONE) {
- const char *error_str = grpc_error_string(error);
- gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str);
- grpc_error_free_string(error_str);
- GRPC_ERROR_UNREF(error);
- grpc_handshake_manager_shutdown(exec_ctx, state->handshake_mgr);
- gpr_free(read_buffer);
- } else {
- // Beware that the call to grpc_create_chttp2_transport() has to happen
- // before grpc_tcp_server_destroy(). This is fine here, but similar code
- // asynchronously doing a handshake instead of calling
- // grpc_tcp_server_start() (as in server_secure_chttp2.c) needs to add
- // synchronization to avoid this case.
- grpc_transport *transport =
- grpc_create_chttp2_transport(exec_ctx, args, endpoint, 0);
- grpc_server_setup_transport(exec_ctx, state->server, transport,
- state->accepting_pollset,
- grpc_server_get_channel_args(state->server));
- grpc_chttp2_transport_start_reading(exec_ctx, transport, read_buffer);
- }
- // Clean up.
- grpc_channel_args_destroy(args);
- grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr);
- gpr_free(state);
-}
-
-static void on_accept(grpc_exec_ctx *exec_ctx, void *server, grpc_endpoint *tcp,
- grpc_pollset *accepting_pollset,
- grpc_tcp_server_acceptor *acceptor) {
- server_connect_state *state = gpr_malloc(sizeof(server_connect_state));
- state->server = server;
- state->accepting_pollset = accepting_pollset;
- state->acceptor = acceptor;
- state->handshake_mgr = grpc_handshake_manager_create();
- // TODO(roth): We should really get this timeout value from channel
- // args instead of hard-coding it.
- const gpr_timespec deadline = gpr_time_add(
- gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN));
- grpc_handshake_manager_do_handshake(
- exec_ctx, state->handshake_mgr, tcp, grpc_server_get_channel_args(server),
- deadline, acceptor, on_handshake_done, state);
-}
-
-/* Server callback: start listening on our ports */
-static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp,
- grpc_pollset **pollsets, size_t pollset_count) {
- grpc_tcp_server *tcp = tcpp;
- grpc_tcp_server_start(exec_ctx, tcp, pollsets, pollset_count, on_accept,
- server);
-}
-
-/* Server callback: destroy the tcp listener (so we don't generate further
- callbacks) */
-static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp,
- grpc_closure *destroy_done) {
- grpc_tcp_server *tcp = tcpp;
- grpc_tcp_server_shutdown_listeners(exec_ctx, tcp);
- grpc_tcp_server_unref(exec_ctx, tcp);
- grpc_exec_ctx_sched(exec_ctx, destroy_done, GRPC_ERROR_NONE, NULL);
-}
-
int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
- grpc_resolved_addresses *resolved = NULL;
- grpc_tcp_server *tcp = NULL;
- size_t i;
- size_t count = 0;
- int port_num = -1;
- int port_temp;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_error *err = GRPC_ERROR_NONE;
-
+ int port_num = 0;
GRPC_API_TRACE("grpc_server_add_insecure_http2_port(server=%p, addr=%s)", 2,
(server, addr));
-
- grpc_error **errors = NULL;
- err = grpc_blocking_resolve_address(addr, "https", &resolved);
- if (err != GRPC_ERROR_NONE) {
- goto error;
- }
-
- err = grpc_tcp_server_create(&exec_ctx, NULL,
- grpc_server_get_channel_args(server), &tcp);
+ grpc_error* err = grpc_chttp2_server_add_port(
+ &exec_ctx, server, addr,
+ grpc_channel_args_copy(grpc_server_get_channel_args(server)),
+ NULL /* handshaker_factory */, &port_num);
if (err != GRPC_ERROR_NONE) {
- goto error;
- }
-
- const size_t naddrs = resolved->naddrs;
- errors = gpr_malloc(sizeof(*errors) * naddrs);
- for (i = 0; i < naddrs; i++) {
- errors[i] = grpc_tcp_server_add_port(tcp, &resolved->addrs[i], &port_temp);
- if (errors[i] == GRPC_ERROR_NONE) {
- if (port_num == -1) {
- port_num = port_temp;
- } else {
- GPR_ASSERT(port_num == port_temp);
- }
- count++;
- }
+ const char *msg = grpc_error_string(err);
+ gpr_log(GPR_ERROR, "%s", msg);
+ grpc_error_free_string(msg);
+ GRPC_ERROR_UNREF(err);
}
- if (count == 0) {
- char *msg;
- gpr_asprintf(&msg, "No address added out of total %" PRIuPTR " resolved",
- naddrs);
- err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs);
- gpr_free(msg);
- goto error;
- } else if (count != naddrs) {
- char *msg;
- gpr_asprintf(&msg, "Only %" PRIuPTR
- " addresses added out of total %" PRIuPTR " resolved",
- count, naddrs);
- err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs);
- gpr_free(msg);
-
- const char *warning_message = grpc_error_string(err);
- gpr_log(GPR_INFO, "WARNING: %s", warning_message);
- grpc_error_free_string(warning_message);
- /* we managed to bind some addresses: continue */
- }
- grpc_resolved_addresses_destroy(resolved);
-
- /* Register with the server only upon success */
- grpc_server_add_listener(&exec_ctx, server, tcp, start, destroy);
- goto done;
-
-/* Error path: cleanup and return */
-error:
- GPR_ASSERT(err != GRPC_ERROR_NONE);
- if (resolved) {
- grpc_resolved_addresses_destroy(resolved);
- }
- if (tcp) {
- grpc_tcp_server_unref(&exec_ctx, tcp);
- }
- port_num = 0;
-
- const char *msg = grpc_error_string(err);
- gpr_log(GPR_ERROR, "%s", msg);
- grpc_error_free_string(msg);
- GRPC_ERROR_UNREF(err);
-
-done:
grpc_exec_ctx_finish(&exec_ctx);
- if (errors != NULL) {
- for (i = 0; i < naddrs; i++) {
- GRPC_ERROR_UNREF(errors[i]);
- }
- }
- gpr_free(errors);
return port_num;
}
diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
index 942638ad7f..5f41728132 100644
--- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
@@ -38,218 +38,63 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-#include <grpc/support/sync.h>
-#include <grpc/support/useful.h>
+
+#include "src/core/ext/transport/chttp2/server/chttp2_server.h"
+
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
-#include "src/core/lib/channel/http_server_filter.h"
-#include "src/core/lib/iomgr/endpoint.h"
-#include "src/core/lib/iomgr/resolve_address.h"
-#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/security/context/security_context.h"
#include "src/core/lib/security/credentials/credentials.h"
-#include "src/core/lib/security/transport/auth_filters.h"
-#include "src/core/lib/security/transport/security_connector.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/server.h"
-typedef struct server_secure_state {
- grpc_server *server;
- grpc_tcp_server *tcp;
- grpc_server_security_connector *sc;
- grpc_server_credentials *creds;
- bool is_shutdown;
- gpr_mu mu;
- grpc_closure tcp_server_shutdown_complete;
- grpc_closure *server_destroy_listener_done;
-} server_secure_state;
-
-typedef struct server_secure_connect {
- server_secure_state *server_state;
- grpc_pollset *accepting_pollset;
- grpc_tcp_server_acceptor *acceptor;
- grpc_handshake_manager *handshake_mgr;
- // TODO(roth): Remove the following two fields when we eliminate
- // grpc_server_security_connector_do_handshake().
- gpr_timespec deadline;
- grpc_channel_args *args;
-} server_secure_connect;
-
-static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep,
- grpc_security_status status,
- grpc_endpoint *secure_endpoint,
- grpc_auth_context *auth_context) {
- server_secure_connect *connection_state = statep;
- if (status == GRPC_SECURITY_OK) {
- if (secure_endpoint) {
- gpr_mu_lock(&connection_state->server_state->mu);
- if (!connection_state->server_state->is_shutdown) {
- grpc_transport *transport = grpc_create_chttp2_transport(
- exec_ctx, grpc_server_get_channel_args(
- connection_state->server_state->server),
- secure_endpoint, 0);
- grpc_arg args_to_add[2];
- args_to_add[0] = grpc_server_credentials_to_arg(
- connection_state->server_state->creds);
- args_to_add[1] = grpc_auth_context_to_arg(auth_context);
- grpc_channel_args *args_copy = grpc_channel_args_copy_and_add(
- connection_state->args, args_to_add, GPR_ARRAY_SIZE(args_to_add));
- grpc_server_setup_transport(
- exec_ctx, connection_state->server_state->server, transport,
- connection_state->accepting_pollset, args_copy);
- grpc_channel_args_destroy(args_copy);
- grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL);
- } else {
- /* We need to consume this here, because the server may already have
- * gone away. */
- grpc_endpoint_destroy(exec_ctx, secure_endpoint);
- }
- gpr_mu_unlock(&connection_state->server_state->mu);
- }
- } else {
- gpr_log(GPR_ERROR, "Secure transport failed with error %d", status);
- }
- grpc_channel_args_destroy(connection_state->args);
- grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp);
- gpr_free(connection_state);
-}
-
-static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
- grpc_channel_args *args,
- grpc_slice_buffer *read_buffer, void *user_data,
- grpc_error *error) {
- server_secure_connect *connection_state = user_data;
- if (error != GRPC_ERROR_NONE) {
- const char *error_str = grpc_error_string(error);
- gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str);
- grpc_error_free_string(error_str);
- GRPC_ERROR_UNREF(error);
- grpc_channel_args_destroy(args);
- gpr_free(read_buffer);
- grpc_handshake_manager_shutdown(exec_ctx, connection_state->handshake_mgr);
- grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr);
- grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp);
- gpr_free(connection_state);
- return;
- }
- grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr);
- connection_state->handshake_mgr = NULL;
- // TODO(roth, jboeuf): Convert security connector handshaking to use new
- // handshake API, and then move the code from on_secure_handshake_done()
- // into this function.
- connection_state->args = args;
- grpc_server_security_connector_do_handshake(
- exec_ctx, connection_state->server_state->sc, connection_state->acceptor,
- endpoint, read_buffer, connection_state->deadline,
- on_secure_handshake_done, connection_state);
-}
-
-static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp,
- grpc_pollset *accepting_pollset,
- grpc_tcp_server_acceptor *acceptor) {
- server_secure_state *server_state = statep;
- server_secure_connect *connection_state = NULL;
- gpr_mu_lock(&server_state->mu);
- if (server_state->is_shutdown) {
- gpr_mu_unlock(&server_state->mu);
- grpc_endpoint_destroy(exec_ctx, tcp);
- return;
- }
- gpr_mu_unlock(&server_state->mu);
- grpc_tcp_server_ref(server_state->tcp);
- connection_state = gpr_malloc(sizeof(*connection_state));
- connection_state->server_state = server_state;
- connection_state->accepting_pollset = accepting_pollset;
- connection_state->acceptor = acceptor;
- connection_state->handshake_mgr = grpc_handshake_manager_create();
- // TODO(roth): We should really get this timeout value from channel
- // args instead of hard-coding it.
- connection_state->deadline = gpr_time_add(
- gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN));
- grpc_handshake_manager_do_handshake(
- exec_ctx, connection_state->handshake_mgr, tcp,
- grpc_server_get_channel_args(connection_state->server_state->server),
- connection_state->deadline, acceptor, on_handshake_done,
- connection_state);
+typedef struct {
+ grpc_chttp2_server_handshaker_factory base;
+ grpc_server_security_connector *security_connector;
+} server_security_handshaker_factory;
+
+static void server_security_handshaker_factory_create_handshakers(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_server_handshaker_factory *hf,
+ grpc_handshake_manager *handshake_mgr) {
+ server_security_handshaker_factory *handshaker_factory =
+ (server_security_handshaker_factory *)hf;
+ grpc_server_security_connector_create_handshakers(
+ exec_ctx, handshaker_factory->security_connector, handshake_mgr);
}
-/* Server callback: start listening on our ports */
-static void server_start_listener(grpc_exec_ctx *exec_ctx, grpc_server *server,
- void *statep, grpc_pollset **pollsets,
- size_t pollset_count) {
- server_secure_state *server_state = statep;
- gpr_mu_lock(&server_state->mu);
- server_state->is_shutdown = false;
- gpr_mu_unlock(&server_state->mu);
- grpc_tcp_server_start(exec_ctx, server_state->tcp, pollsets, pollset_count,
- on_accept, server_state);
+static void server_security_handshaker_factory_destroy(
+ grpc_exec_ctx* exec_ctx, grpc_chttp2_server_handshaker_factory *hf) {
+ server_security_handshaker_factory *handshaker_factory =
+ (server_security_handshaker_factory *)hf;
+ GRPC_SECURITY_CONNECTOR_UNREF(&handshaker_factory->security_connector->base,
+ "server");
+ gpr_free(hf);
}
-static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *statep,
- grpc_error *error) {
- server_secure_state *server_state = statep;
- /* ensure all threads have unlocked */
- gpr_mu_lock(&server_state->mu);
- grpc_closure *destroy_done = server_state->server_destroy_listener_done;
- GPR_ASSERT(server_state->is_shutdown);
- gpr_mu_unlock(&server_state->mu);
- /* clean up */
- grpc_server_security_connector_shutdown(exec_ctx, server_state->sc);
-
- /* Flush queued work before a synchronous unref. */
- grpc_exec_ctx_flush(exec_ctx);
- GRPC_SECURITY_CONNECTOR_UNREF(&server_state->sc->base, "server");
- grpc_server_credentials_unref(server_state->creds);
-
- if (destroy_done != NULL) {
- destroy_done->cb(exec_ctx, destroy_done->cb_arg, GRPC_ERROR_REF(error));
- grpc_exec_ctx_flush(exec_ctx);
- }
- gpr_free(server_state);
-}
-
-static void server_destroy_listener(grpc_exec_ctx *exec_ctx,
- grpc_server *server, void *statep,
- grpc_closure *callback) {
- server_secure_state *server_state = statep;
- grpc_tcp_server *tcp;
- gpr_mu_lock(&server_state->mu);
- server_state->is_shutdown = true;
- server_state->server_destroy_listener_done = callback;
- tcp = server_state->tcp;
- gpr_mu_unlock(&server_state->mu);
- grpc_tcp_server_shutdown_listeners(exec_ctx, tcp);
- grpc_tcp_server_unref(exec_ctx, server_state->tcp);
-}
+static const grpc_chttp2_server_handshaker_factory_vtable
+ server_security_handshaker_factory_vtable = {
+ server_security_handshaker_factory_create_handshakers,
+ server_security_handshaker_factory_destroy};
int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
grpc_server_credentials *creds) {
- grpc_resolved_addresses *resolved = NULL;
- grpc_tcp_server *tcp = NULL;
- server_secure_state *server_state = NULL;
- size_t i;
- size_t count = 0;
- int port_num = -1;
- int port_temp;
- grpc_security_status status = GRPC_SECURITY_ERROR;
- grpc_server_security_connector *sc = NULL;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_error *err = GRPC_ERROR_NONE;
- grpc_error **errors = NULL;
-
+ grpc_server_security_connector *sc = NULL;
+ int port_num = 0;
GRPC_API_TRACE(
"grpc_server_add_secure_http2_port("
"server=%p, addr=%s, creds=%p)",
3, (server, addr, creds));
-
- /* create security context */
+ // Create security context.
if (creds == NULL) {
err = GRPC_ERROR_CREATE(
"No credentials specified for secure server port (creds==NULL)");
- goto error;
+ goto done;
}
- status = grpc_server_credentials_create_security_connector(creds, &sc);
+ grpc_security_status status =
+ grpc_server_credentials_create_security_connector(creds, &sc);
if (status != GRPC_SECURITY_OK) {
char *msg;
gpr_asprintf(&msg,
@@ -258,107 +103,28 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
err = grpc_error_set_int(GRPC_ERROR_CREATE(msg),
GRPC_ERROR_INT_SECURITY_STATUS, status);
gpr_free(msg);
- goto error;
+ goto done;
}
- sc->channel_args = grpc_server_get_channel_args(server);
-
- /* resolve address */
- err = grpc_blocking_resolve_address(addr, "https", &resolved);
- if (err != GRPC_ERROR_NONE) {
- goto error;
- }
- server_state = gpr_malloc(sizeof(*server_state));
- memset(server_state, 0, sizeof(*server_state));
- grpc_closure_init(&server_state->tcp_server_shutdown_complete,
- tcp_server_shutdown_complete, server_state);
- err = grpc_tcp_server_create(&exec_ctx,
- &server_state->tcp_server_shutdown_complete,
- grpc_server_get_channel_args(server), &tcp);
+ // Create handshaker factory.
+ server_security_handshaker_factory* handshaker_factory =
+ gpr_malloc(sizeof(*handshaker_factory));
+ memset(handshaker_factory, 0, sizeof(*handshaker_factory));
+ handshaker_factory->base.vtable = &server_security_handshaker_factory_vtable;
+ handshaker_factory->security_connector = sc;
+ // Create channel args.
+ grpc_arg channel_arg = grpc_server_credentials_to_arg(creds);
+ grpc_channel_args *args = grpc_channel_args_copy_and_add(
+ grpc_server_get_channel_args(server), &channel_arg, 1);
+ // Add server port.
+ err = grpc_chttp2_server_add_port(&exec_ctx, server, addr, args,
+ &handshaker_factory->base, &port_num);
+done:
+ grpc_exec_ctx_finish(&exec_ctx);
if (err != GRPC_ERROR_NONE) {
- goto error;
+ const char *msg = grpc_error_string(err);
+ gpr_log(GPR_ERROR, "%s", msg);
+ grpc_error_free_string(msg);
+ GRPC_ERROR_UNREF(err);
}
-
- server_state->server = server;
- server_state->tcp = tcp;
- server_state->sc = sc;
- server_state->creds = grpc_server_credentials_ref(creds);
- server_state->is_shutdown = true;
- gpr_mu_init(&server_state->mu);
-
- errors = gpr_malloc(sizeof(*errors) * resolved->naddrs);
- for (i = 0; i < resolved->naddrs; i++) {
- errors[i] = grpc_tcp_server_add_port(tcp, &resolved->addrs[i], &port_temp);
- if (errors[i] == GRPC_ERROR_NONE) {
- if (port_num == -1) {
- port_num = port_temp;
- } else {
- GPR_ASSERT(port_num == port_temp);
- }
- count++;
- }
- }
- if (count == 0) {
- char *msg;
- gpr_asprintf(&msg, "No address added out of total %" PRIuPTR " resolved",
- resolved->naddrs);
- err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, resolved->naddrs);
- gpr_free(msg);
- goto error;
- } else if (count != resolved->naddrs) {
- char *msg;
- gpr_asprintf(&msg, "Only %" PRIuPTR
- " addresses added out of total %" PRIuPTR " resolved",
- count, resolved->naddrs);
- err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, resolved->naddrs);
- gpr_free(msg);
-
- const char *warning_message = grpc_error_string(err);
- gpr_log(GPR_INFO, "WARNING: %s", warning_message);
- grpc_error_free_string(warning_message);
- /* we managed to bind some addresses: continue */
- } else {
- for (i = 0; i < resolved->naddrs; i++) {
- GRPC_ERROR_UNREF(errors[i]);
- }
- }
- gpr_free(errors);
- errors = NULL;
- grpc_resolved_addresses_destroy(resolved);
-
- /* Register with the server only upon success */
- grpc_server_add_listener(&exec_ctx, server, server_state,
- server_start_listener, server_destroy_listener);
-
- grpc_exec_ctx_finish(&exec_ctx);
return port_num;
-
-/* Error path: cleanup and return */
-error:
- GPR_ASSERT(err != GRPC_ERROR_NONE);
- if (errors != NULL) {
- for (i = 0; i < resolved->naddrs; i++) {
- GRPC_ERROR_UNREF(errors[i]);
- }
- gpr_free(errors);
- }
- if (resolved) {
- grpc_resolved_addresses_destroy(resolved);
- }
- if (tcp) {
- grpc_tcp_server_unref(&exec_ctx, tcp);
- } else {
- if (sc) {
- grpc_exec_ctx_flush(&exec_ctx);
- GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "server");
- }
- if (server_state) {
- gpr_free(server_state);
- }
- }
- grpc_exec_ctx_finish(&exec_ctx);
- const char *msg = grpc_error_string(err);
- GRPC_ERROR_UNREF(err);
- gpr_log(GPR_ERROR, "%s", msg);
- grpc_error_free_string(msg);
- return 0;
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 4e3c7ff681..3e7c078d3c 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -111,9 +111,6 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream,
grpc_error *error_ignored);
-static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t, grpc_chttp2_stream *s,
- grpc_error *error);
static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
@@ -428,6 +425,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
/* flush writable stream list to avoid dangling references */
grpc_chttp2_stream *s;
while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
+ grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close");
}
end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error));
@@ -523,6 +521,10 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
}
}
+ if (s->fail_pending_writes_on_writes_finished_error != NULL) {
+ GRPC_ERROR_UNREF(s->fail_pending_writes_on_writes_finished_error);
+ }
+
GPR_ASSERT(s->send_initial_metadata_finished == NULL);
GPR_ASSERT(s->fetching_send_message == NULL);
GPR_ASSERT(s->send_trailing_metadata_finished == NULL);
@@ -704,8 +706,6 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
}
}
- grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error));
-
switch (t->write_state) {
case GRPC_CHTTP2_WRITE_STATE_IDLE:
GPR_UNREACHABLE_CODE(break);
@@ -734,6 +734,8 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
break;
}
+ grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error));
+
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing");
GPR_TIMER_END("terminate_writing_with_lock", 0);
}
@@ -1404,6 +1406,7 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
if (grpc_chttp2_list_remove_writable_stream(t, s)) {
+ grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:remove_stream");
}
@@ -1534,9 +1537,41 @@ static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s,
return error;
}
-static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t, grpc_chttp2_stream *s,
- grpc_error *error) {
+void grpc_chttp2_leave_writing_lists(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s) {
+ if (s->need_fail_pending_writes_on_writes_finished) {
+ grpc_error *error = s->fail_pending_writes_on_writes_finished_error;
+ s->fail_pending_writes_on_writes_finished_error = NULL;
+ s->need_fail_pending_writes_on_writes_finished = false;
+ grpc_chttp2_fail_pending_writes(exec_ctx, t, s, error);
+ }
+}
+
+void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s, grpc_error *error) {
+ if (s->need_fail_pending_writes_on_writes_finished ||
+ (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE &&
+ (s->included[GRPC_CHTTP2_LIST_WRITABLE] ||
+ s->included[GRPC_CHTTP2_LIST_WRITING]))) {
+ /* If a write is in progress, and it involves this stream, wait for the
+ * write to complete before cancelling things out. If we don't do this, then
+ * our combiner lock might think that some operation on its queue might be
+ * covering a completion even though there is none, in which case we might
+ * offload to another thread, which isn't guarateed to exist */
+ if (error != GRPC_ERROR_NONE) {
+ if (s->fail_pending_writes_on_writes_finished_error == GRPC_ERROR_NONE) {
+ s->fail_pending_writes_on_writes_finished_error = GRPC_ERROR_CREATE(
+ "Post-poned fail writes due to in-progress write");
+ }
+ s->fail_pending_writes_on_writes_finished_error = grpc_error_add_child(
+ s->fail_pending_writes_on_writes_finished_error, error);
+ }
+ s->need_fail_pending_writes_on_writes_finished = true;
+ return; /* early out */
+ }
+
error =
removal_error(error, s, "Pending writes failed due to stream closure");
s->send_initial_metadata = NULL;
@@ -1590,7 +1625,7 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
if (close_writes && !s->write_closed) {
s->write_closed_error = GRPC_ERROR_REF(error);
s->write_closed = true;
- fail_pending_writes(exec_ctx, t, s, GRPC_ERROR_REF(error));
+ grpc_chttp2_fail_pending_writes(exec_ctx, t, s, GRPC_ERROR_REF(error));
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
if (s->read_closed && s->write_closed) {
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index b74233d992..6cba1e7fd2 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -409,6 +409,9 @@ struct grpc_chttp2_stream {
grpc_error *read_closed_error;
/** the error that resulted in this stream being write-closed */
grpc_error *write_closed_error;
+ /** should any writes be cleared once this stream becomes non-writable */
+ bool need_fail_pending_writes_on_writes_finished;
+ grpc_error *fail_pending_writes_on_writes_finished_error;
grpc_published_metadata_method published_metadata[2];
bool final_metadata_requested;
@@ -689,4 +692,11 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
+void grpc_chttp2_leave_writing_lists(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s);
+void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s, grpc_error *error);
+
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 139e7387c4..769b229a0d 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -208,6 +208,7 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:already_writing");
}
} else {
+ grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:no_write");
}
}
@@ -252,6 +253,7 @@ void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, !t->is_client, 1,
GRPC_ERROR_REF(error));
}
+ grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:end");
}
grpc_slice_buffer_reset_and_unref(&t->outbuf);
diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c
index a45a39981c..90626dc2d1 100644
--- a/src/core/lib/channel/handshaker.c
+++ b/src/core/lib/channel/handshaker.c
@@ -38,67 +38,66 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
+#include "src/core/lib/iomgr/timer.h"
//
// grpc_handshaker
//
-void grpc_handshaker_init(const struct grpc_handshaker_vtable* vtable,
+void grpc_handshaker_init(const grpc_handshaker_vtable* vtable,
grpc_handshaker* handshaker) {
handshaker->vtable = vtable;
}
-void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx,
- grpc_handshaker* handshaker) {
+static void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker) {
handshaker->vtable->destroy(exec_ctx, handshaker);
}
-void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
- grpc_handshaker* handshaker) {
+static void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker) {
handshaker->vtable->shutdown(exec_ctx, handshaker);
}
-void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
- grpc_handshaker* handshaker,
- grpc_endpoint* endpoint,
- grpc_channel_args* args,
- grpc_slice_buffer* read_buffer,
- gpr_timespec deadline,
- grpc_tcp_server_acceptor* acceptor,
- grpc_handshaker_done_cb cb, void* user_data) {
- handshaker->vtable->do_handshake(exec_ctx, handshaker, endpoint, args,
- read_buffer, deadline, acceptor, cb,
- user_data);
+static void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker,
+ grpc_tcp_server_acceptor* acceptor,
+ grpc_closure* on_handshake_done,
+ grpc_handshaker_args* args) {
+ handshaker->vtable->do_handshake(exec_ctx, handshaker, acceptor,
+ on_handshake_done, args);
}
//
// grpc_handshake_manager
//
-// State used while chaining handshakers.
-struct grpc_handshaker_state {
- // The index of the handshaker to invoke next.
- size_t index;
- // The deadline for all handshakers.
- gpr_timespec deadline;
- // The acceptor to call the handshakers with.
- grpc_tcp_server_acceptor* acceptor;
- // The final callback and user_data to invoke after the last handshaker.
- grpc_handshaker_done_cb final_cb;
- void* final_user_data;
-};
-
struct grpc_handshake_manager {
+ gpr_mu mu;
+ gpr_refcount refs;
+ bool shutdown;
// An array of handshakers added via grpc_handshake_manager_add().
size_t count;
grpc_handshaker** handshakers;
- // State used while chaining handshakers.
- struct grpc_handshaker_state* state;
+ // The index of the handshaker to invoke next and closure to invoke it.
+ size_t index;
+ grpc_closure call_next_handshaker;
+ // The acceptor to call the handshakers with.
+ grpc_tcp_server_acceptor* acceptor;
+ // Deadline timer across all handshakers.
+ grpc_timer deadline_timer;
+ // The final callback and user_data to invoke after the last handshaker.
+ grpc_closure on_handshake_done;
+ void* user_data;
+ // Handshaker args.
+ grpc_handshaker_args args;
};
grpc_handshake_manager* grpc_handshake_manager_create() {
grpc_handshake_manager* mgr = gpr_malloc(sizeof(grpc_handshake_manager));
memset(mgr, 0, sizeof(*mgr));
+ gpr_mu_init(&mgr->mu);
+ gpr_ref_init(&mgr->refs, 1);
return mgr;
}
@@ -106,6 +105,7 @@ static bool is_power_of_2(size_t n) { return (n & (n - 1)) == 0; }
void grpc_handshake_manager_add(grpc_handshake_manager* mgr,
grpc_handshaker* handshaker) {
+ gpr_mu_lock(&mgr->mu);
// To avoid allocating memory for each handshaker we add, we double
// the number of elements every time we need more.
size_t realloc_count = 0;
@@ -119,85 +119,116 @@ void grpc_handshake_manager_add(grpc_handshake_manager* mgr,
gpr_realloc(mgr->handshakers, realloc_count * sizeof(grpc_handshaker*));
}
mgr->handshakers[mgr->count++] = handshaker;
+ gpr_mu_unlock(&mgr->mu);
+}
+
+static void grpc_handshake_manager_unref(grpc_exec_ctx* exec_ctx,
+ grpc_handshake_manager* mgr) {
+ if (gpr_unref(&mgr->refs)) {
+ for (size_t i = 0; i < mgr->count; ++i) {
+ grpc_handshaker_destroy(exec_ctx, mgr->handshakers[i]);
+ }
+ gpr_free(mgr->handshakers);
+ gpr_mu_destroy(&mgr->mu);
+ gpr_free(mgr);
+ }
}
void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx,
grpc_handshake_manager* mgr) {
- for (size_t i = 0; i < mgr->count; ++i) {
- grpc_handshaker_destroy(exec_ctx, mgr->handshakers[i]);
- }
- gpr_free(mgr->handshakers);
- gpr_free(mgr);
+ grpc_handshake_manager_unref(exec_ctx, mgr);
}
void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshake_manager* mgr) {
- for (size_t i = 0; i < mgr->count; ++i) {
- grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[i]);
+ gpr_mu_lock(&mgr->mu);
+ // Shutdown the handshaker that's currently in progress, if any.
+ if (!mgr->shutdown && mgr->index > 0) {
+ mgr->shutdown = true;
+ grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[mgr->index - 1]);
}
- if (mgr->state != NULL) {
- gpr_free(mgr->state);
- mgr->state = NULL;
+ gpr_mu_unlock(&mgr->mu);
+}
+
+// Helper function to call either the next handshaker or the
+// on_handshake_done callback.
+// Returns true if we've scheduled the on_handshake_done callback.
+static bool call_next_handshaker_locked(grpc_exec_ctx* exec_ctx,
+ grpc_handshake_manager* mgr,
+ grpc_error* error) {
+ GPR_ASSERT(mgr->index <= mgr->count);
+ // If we got an error or we've been shut down or we've finished the last
+ // handshaker, invoke the on_handshake_done callback. Otherwise, call the
+ // next handshaker.
+ if (error != GRPC_ERROR_NONE || mgr->shutdown || mgr->index == mgr->count) {
+ // Cancel deadline timer, since we're invoking the on_handshake_done
+ // callback now.
+ grpc_timer_cancel(exec_ctx, &mgr->deadline_timer);
+ grpc_exec_ctx_sched(exec_ctx, &mgr->on_handshake_done, error, NULL);
+ mgr->shutdown = true;
+ } else {
+ grpc_handshaker_do_handshake(exec_ctx, mgr->handshakers[mgr->index],
+ mgr->acceptor, &mgr->call_next_handshaker,
+ &mgr->args);
}
+ ++mgr->index;
+ return mgr->shutdown;
}
// A function used as the handshaker-done callback when chaining
// handshakers together.
-static void call_next_handshaker(grpc_exec_ctx* exec_ctx,
- grpc_endpoint* endpoint,
- grpc_channel_args* args,
- grpc_slice_buffer* read_buffer,
- void* user_data, grpc_error* error) {
- grpc_handshake_manager* mgr = user_data;
- GPR_ASSERT(mgr->state != NULL);
- GPR_ASSERT(mgr->state->index < mgr->count);
- // If we got an error, skip all remaining handshakers and invoke the
- // caller-supplied callback immediately.
- if (error != GRPC_ERROR_NONE) {
- mgr->state->final_cb(exec_ctx, endpoint, args, read_buffer,
- mgr->state->final_user_data, error);
- return;
+static void call_next_handshaker(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ grpc_handshake_manager* mgr = arg;
+ gpr_mu_lock(&mgr->mu);
+ bool done = call_next_handshaker_locked(exec_ctx, mgr, GRPC_ERROR_REF(error));
+ gpr_mu_unlock(&mgr->mu);
+ // If we're invoked the final callback, we won't be coming back
+ // to this function, so we can release our reference to the
+ // handshake manager.
+ if (done) {
+ grpc_handshake_manager_unref(exec_ctx, mgr);
}
- grpc_handshaker_done_cb cb = call_next_handshaker;
- // If this is the last handshaker, use the caller-supplied callback
- // and user_data instead of chaining back to this function again.
- if (mgr->state->index == mgr->count - 1) {
- cb = mgr->state->final_cb;
- user_data = mgr->state->final_user_data;
- }
- // Invoke handshaker.
- grpc_handshaker_do_handshake(
- exec_ctx, mgr->handshakers[mgr->state->index], endpoint, args,
- read_buffer, mgr->state->deadline, mgr->state->acceptor, cb, user_data);
- ++mgr->state->index;
- // If this is the last handshaker, clean up state.
- if (mgr->state->index == mgr->count) {
- gpr_free(mgr->state);
- mgr->state = NULL;
+}
+
+// Callback invoked when deadline is exceeded.
+static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+ grpc_handshake_manager* mgr = arg;
+ if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled.
+ grpc_handshake_manager_shutdown(exec_ctx, mgr);
}
+ grpc_handshake_manager_unref(exec_ctx, mgr);
}
void grpc_handshake_manager_do_handshake(
grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr,
- grpc_endpoint* endpoint, const grpc_channel_args* args,
+ grpc_endpoint* endpoint, const grpc_channel_args* channel_args,
gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor,
- grpc_handshaker_done_cb cb, void* user_data) {
- grpc_channel_args* args_copy = grpc_channel_args_copy(args);
- grpc_slice_buffer* read_buffer = gpr_malloc(sizeof(*read_buffer));
- grpc_slice_buffer_init(read_buffer);
- if (mgr->count == 0) {
- // No handshakers registered, so we just immediately call the done
- // callback with the passed-in endpoint.
- cb(exec_ctx, endpoint, args_copy, read_buffer, user_data, GRPC_ERROR_NONE);
- } else {
- GPR_ASSERT(mgr->state == NULL);
- mgr->state = gpr_malloc(sizeof(struct grpc_handshaker_state));
- memset(mgr->state, 0, sizeof(*mgr->state));
- mgr->state->deadline = deadline;
- mgr->state->acceptor = acceptor;
- mgr->state->final_cb = cb;
- mgr->state->final_user_data = user_data;
- call_next_handshaker(exec_ctx, endpoint, args_copy, read_buffer, mgr,
- GRPC_ERROR_NONE);
+ grpc_iomgr_cb_func on_handshake_done, void* user_data) {
+ gpr_mu_lock(&mgr->mu);
+ GPR_ASSERT(mgr->index == 0);
+ GPR_ASSERT(!mgr->shutdown);
+ // Construct handshaker args. These will be passed through all
+ // handshakers and eventually be freed by the on_handshake_done callback.
+ mgr->args.endpoint = endpoint;
+ mgr->args.args = grpc_channel_args_copy(channel_args);
+ mgr->args.user_data = user_data;
+ mgr->args.read_buffer = gpr_malloc(sizeof(*mgr->args.read_buffer));
+ grpc_slice_buffer_init(mgr->args.read_buffer);
+ // Initialize state needed for calling handshakers.
+ mgr->acceptor = acceptor;
+ grpc_closure_init(&mgr->call_next_handshaker, call_next_handshaker, mgr);
+ grpc_closure_init(&mgr->on_handshake_done, on_handshake_done, &mgr->args);
+ // Start deadline timer, which owns a ref.
+ gpr_ref(&mgr->refs);
+ grpc_timer_init(exec_ctx, &mgr->deadline_timer,
+ gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
+ on_timeout, mgr, gpr_now(GPR_CLOCK_MONOTONIC));
+ // Start first handshaker, which also owns a ref.
+ gpr_ref(&mgr->refs);
+ bool done = call_next_handshaker_locked(exec_ctx, mgr, GRPC_ERROR_NONE);
+ gpr_mu_unlock(&mgr->mu);
+ if (done) {
+ grpc_handshake_manager_unref(exec_ctx, mgr);
}
}
diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h
index f8a36c6473..ebbc1ff7f3 100644
--- a/src/core/lib/channel/handshaker.h
+++ b/src/core/lib/channel/handshaker.h
@@ -54,15 +54,30 @@
typedef struct grpc_handshaker grpc_handshaker;
-/// Callback type invoked when a handshaker is done.
-/// Takes ownership of \a args and \a read_buffer.
-typedef void (*grpc_handshaker_done_cb)(grpc_exec_ctx* exec_ctx,
- grpc_endpoint* endpoint,
- grpc_channel_args* args,
- grpc_slice_buffer* read_buffer,
- void* user_data, grpc_error* error);
-
-struct grpc_handshaker_vtable {
+/// Arguments passed through handshakers and to the on_handshake_done callback.
+///
+/// For handshakers, all members are input/output parameters; for
+/// example, a handshaker may read from or write to \a endpoint and
+/// then later replace it with a wrapped endpoint. Similarly, a
+/// handshaker may modify \a args.
+///
+/// A handshaker takes ownership of the members while a handshake is in
+/// progress. Upon failure or shutdown of an in-progress handshaker,
+/// the handshaker is responsible for destroying the members and setting
+/// them to NULL before invoking the on_handshake_done callback.
+///
+/// For the on_handshake_done callback, all members are input arguments,
+/// which the callback takes ownership of.
+typedef struct {
+ grpc_endpoint* endpoint;
+ grpc_channel_args* args;
+ grpc_slice_buffer* read_buffer;
+ // User data passed through the handshake manager. Not used by
+ // individual handshakers.
+ void* user_data;
+} grpc_handshaker_args;
+
+typedef struct {
/// Destroys the handshaker.
void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker);
@@ -70,44 +85,26 @@ struct grpc_handshaker_vtable {
/// aborted in the middle).
void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker);
- /// Performs handshaking. When finished, calls \a cb with \a user_data.
- /// Takes ownership of \a args.
- /// Takes ownership of \a read_buffer, which contains leftover bytes read
- /// from the endpoint by the previous handshaker.
+ /// Performs handshaking, modifying \a args as needed (e.g., to
+ /// replace \a endpoint with a wrapped endpoint).
+ /// When finished, invokes \a on_handshake_done.
/// \a acceptor will be NULL for client-side handshakers.
void (*do_handshake)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker,
- grpc_endpoint* endpoint, grpc_channel_args* args,
- grpc_slice_buffer* read_buffer, gpr_timespec deadline,
grpc_tcp_server_acceptor* acceptor,
- grpc_handshaker_done_cb cb, void* user_data);
-};
+ grpc_closure* on_handshake_done,
+ grpc_handshaker_args* args);
+} grpc_handshaker_vtable;
/// Base struct. To subclass, make this the first member of the
/// implementation struct.
struct grpc_handshaker {
- const struct grpc_handshaker_vtable* vtable;
+ const grpc_handshaker_vtable* vtable;
};
/// Called by concrete implementations to initialize the base struct.
-void grpc_handshaker_init(const struct grpc_handshaker_vtable* vtable,
+void grpc_handshaker_init(const grpc_handshaker_vtable* vtable,
grpc_handshaker* handshaker);
-/// Convenient wrappers for invoking methods via the vtable.
-/// These probably do not need to be called from anywhere but
-/// grpc_handshake_manager.
-void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx,
- grpc_handshaker* handshaker);
-void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
- grpc_handshaker* handshaker);
-void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
- grpc_handshaker* handshaker,
- grpc_endpoint* endpoint,
- grpc_channel_args* args,
- grpc_slice_buffer* read_buffer,
- gpr_timespec deadline,
- grpc_tcp_server_acceptor* acceptor,
- grpc_handshaker_done_cb cb, void* user_data);
-
///
/// grpc_handshake_manager
///
@@ -134,15 +131,21 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshake_manager* mgr);
/// Invokes handshakers in the order they were added.
-/// Does NOT take ownership of \a args. Instead, makes a copy before
+/// Takes ownership of \a endpoint, and then passes that ownership to
+/// the \a on_handshake_done callback.
+/// Does NOT take ownership of \a channel_args. Instead, makes a copy before
/// invoking the first handshaker.
/// \a acceptor will be NULL for client-side handshakers.
-/// Invokes \a cb with \a user_data after either a handshaker fails or
-/// all handshakers have completed successfully.
+///
+/// When done, invokes \a on_handshake_done with a grpc_handshaker_args
+/// object as its argument. If the callback is invoked with error !=
+/// GRPC_ERROR_NONE, then handshaking failed and the handshaker has done
+/// the necessary clean-up. Otherwise, the callback takes ownership of
+/// the arguments.
void grpc_handshake_manager_do_handshake(
grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr,
- grpc_endpoint* endpoint, const grpc_channel_args* args,
+ grpc_endpoint* endpoint, const grpc_channel_args* channel_args,
gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor,
- grpc_handshaker_done_cb cb, void* user_data);
+ grpc_iomgr_cb_func on_handshake_done, void* user_data);
#endif /* GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H */
diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c
index 24d264c32a..0ab34d00e4 100644
--- a/src/core/lib/http/httpcli_security_connector.c
+++ b/src/core/lib/http/httpcli_security_connector.c
@@ -38,7 +38,9 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-#include "src/core/lib/security/transport/handshake.h"
+
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/security/transport/security_handshaker.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/tsi/ssl_transport_security.h"
@@ -58,52 +60,42 @@ static void httpcli_ssl_destroy(grpc_security_connector *sc) {
gpr_free(sc);
}
-static void httpcli_ssl_do_handshake(grpc_exec_ctx *exec_ctx,
- grpc_channel_security_connector *sc,
- grpc_endpoint *nonsecure_endpoint,
- grpc_slice_buffer *read_buffer,
- gpr_timespec deadline,
- grpc_security_handshake_done_cb cb,
- void *user_data) {
+static void httpcli_ssl_create_handshakers(
+ grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc,
+ grpc_handshake_manager *handshake_mgr) {
grpc_httpcli_ssl_channel_security_connector *c =
(grpc_httpcli_ssl_channel_security_connector *)sc;
- tsi_result result = TSI_OK;
- tsi_handshaker *handshaker;
- if (c->handshaker_factory == NULL) {
- gpr_free(read_buffer);
- cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL);
- return;
- }
- result = tsi_ssl_handshaker_factory_create_handshaker(
- c->handshaker_factory, c->secure_peer_name, &handshaker);
- if (result != TSI_OK) {
- gpr_log(GPR_ERROR, "Handshaker creation failed with error %s.",
- tsi_result_to_string(result));
- gpr_free(read_buffer);
- cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL);
- } else {
- grpc_do_security_handshake(exec_ctx, handshaker, &sc->base, true,
- nonsecure_endpoint, read_buffer, deadline, cb,
- user_data);
+ tsi_handshaker *handshaker = NULL;
+ if (c->handshaker_factory != NULL) {
+ tsi_result result = tsi_ssl_handshaker_factory_create_handshaker(
+ c->handshaker_factory, c->secure_peer_name, &handshaker);
+ if (result != TSI_OK) {
+ gpr_log(GPR_ERROR, "Handshaker creation failed with error %s.",
+ tsi_result_to_string(result));
+ }
}
+ grpc_security_create_handshakers(exec_ctx, handshaker, &sc->base,
+ handshake_mgr);
}
static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc, tsi_peer peer,
- grpc_security_peer_check_cb cb,
- void *user_data) {
+ grpc_auth_context **auth_context,
+ grpc_closure *on_peer_checked) {
grpc_httpcli_ssl_channel_security_connector *c =
(grpc_httpcli_ssl_channel_security_connector *)sc;
- grpc_security_status status = GRPC_SECURITY_OK;
+ grpc_error *error = GRPC_ERROR_NONE;
/* Check the peer name. */
if (c->secure_peer_name != NULL &&
!tsi_ssl_peer_matches_name(&peer, c->secure_peer_name)) {
- gpr_log(GPR_ERROR, "Peer name %s is not in peer certificate",
- c->secure_peer_name);
- status = GRPC_SECURITY_ERROR;
+ char *msg;
+ gpr_asprintf(&msg, "Peer name %s is not in peer certificate",
+ c->secure_peer_name);
+ error = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
}
- cb(exec_ctx, user_data, status, NULL);
+ grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL);
tsi_peer_destruct(&peer);
}
@@ -140,7 +132,7 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create(
*sc = NULL;
return GRPC_SECURITY_ERROR;
}
- c->base.do_handshake = httpcli_ssl_do_handshake;
+ c->base.create_handshakers = httpcli_ssl_create_handshakers;
*sc = &c->base;
return GRPC_SECURITY_OK;
}
@@ -150,19 +142,25 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create(
typedef struct {
void (*func)(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *endpoint);
void *arg;
+ grpc_handshake_manager *handshake_mgr;
} on_done_closure;
-static void on_secure_transport_setup_done(grpc_exec_ctx *exec_ctx, void *rp,
- grpc_security_status status,
- grpc_endpoint *secure_endpoint,
- grpc_auth_context *auth_context) {
- on_done_closure *c = rp;
- if (status != GRPC_SECURITY_OK) {
- gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status);
+static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_handshaker_args *args = arg;
+ on_done_closure *c = args->user_data;
+ if (error != GRPC_ERROR_NONE) {
+ const char *msg = grpc_error_string(error);
+ gpr_log(GPR_ERROR, "Secure transport setup failed: %s", msg);
+ grpc_error_free_string(msg);
c->func(exec_ctx, c->arg, NULL);
} else {
- c->func(exec_ctx, c->arg, secure_endpoint);
+ grpc_channel_args_destroy(args->args);
+ grpc_slice_buffer_destroy(args->read_buffer);
+ gpr_free(args->read_buffer);
+ c->func(exec_ctx, c->arg, args->endpoint);
}
+ grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr);
gpr_free(c);
}
@@ -183,11 +181,15 @@ static void ssl_handshake(grpc_exec_ctx *exec_ctx, void *arg,
}
c->func = on_done;
c->arg = arg;
+ c->handshake_mgr = grpc_handshake_manager_create();
GPR_ASSERT(httpcli_ssl_channel_security_connector_create(
pem_root_certs, pem_root_certs_size, host, &sc) ==
GRPC_SECURITY_OK);
- grpc_channel_security_connector_do_handshake(
- exec_ctx, sc, tcp, NULL, deadline, on_secure_transport_setup_done, c);
+ grpc_channel_security_connector_create_handshakers(exec_ctx, sc,
+ c->handshake_mgr);
+ grpc_handshake_manager_do_handshake(
+ exec_ctx, c->handshake_mgr, tcp, NULL /* channel_args */, deadline,
+ NULL /* acceptor */, on_handshake_done, c /* user_data */);
GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "httpcli");
}
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index 60ee14eb23..cfc67020ae 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -90,6 +90,12 @@ static bool is_covered_by_poller(grpc_combiner *lock) {
gpr_atm_acq_load(&lock->elements_covered_by_poller) > 0;
}
+#define IS_COVERED_BY_POLLER_FMT "(final=%d elems=%" PRIdPTR ")->%d"
+#define IS_COVERED_BY_POLLER_ARGS(lock) \
+ (lock)->final_list_covered_by_poller, \
+ gpr_atm_acq_load(&(lock)->elements_covered_by_poller), \
+ is_covered_by_poller((lock))
+
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
grpc_combiner *lock = gpr_malloc(sizeof(*lock));
lock->next_combiner_on_this_exec_ctx = NULL;
@@ -197,9 +203,10 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
GRPC_COMBINER_TRACE(
gpr_log(GPR_DEBUG,
"C:%p grpc_combiner_continue_exec_ctx workqueue=%p "
- "is_covered_by_poller=%d exec_ctx_ready_to_finish=%d "
+ "is_covered_by_poller=" IS_COVERED_BY_POLLER_FMT
+ " exec_ctx_ready_to_finish=%d "
"time_to_execute_final_list=%d",
- lock, lock->optional_workqueue, is_covered_by_poller(lock),
+ lock, lock->optional_workqueue, IS_COVERED_BY_POLLER_ARGS(lock),
grpc_exec_ctx_ready_to_finish(exec_ctx),
lock->time_to_execute_final_list));
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index 379bf9bd23..213d29600c 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -144,6 +144,12 @@ struct grpc_resource_quota {
/* Closure around rq_reclamation_done */
grpc_closure rq_reclamation_done_closure;
+ /* This is only really usable for debugging: it's always a stale pointer, but
+ a stale pointer that might just be fresh enough to guide us to where the
+ reclamation system is stuck */
+ grpc_closure *debug_only_last_initiated_reclaimer;
+ grpc_resource_user *debug_only_last_reclaimer_resource_user;
+
/* Roots of all resource user lists */
grpc_resource_user *roots[GRPC_RULIST_COUNT];
@@ -225,6 +231,7 @@ static void rulist_remove(grpc_resource_user *resource_user, grpc_rulist list) {
resource_user->links[list].prev;
resource_user->links[list].prev->links[list].next =
resource_user->links[list].next;
+ resource_user->links[list].next = resource_user->links[list].prev = NULL;
}
/*******************************************************************************
@@ -340,6 +347,9 @@ static bool rq_reclaim(grpc_exec_ctx *exec_ctx,
resource_quota->reclaiming = true;
grpc_resource_quota_internal_ref(resource_quota);
grpc_closure *c = resource_user->reclaimers[destructive];
+ GPR_ASSERT(c);
+ resource_quota->debug_only_last_reclaimer_resource_user = resource_user;
+ resource_quota->debug_only_last_initiated_reclaimer = c;
resource_user->reclaimers[destructive] = NULL;
grpc_closure_run(exec_ctx, c, GRPC_ERROR_NONE);
return true;
@@ -476,6 +486,8 @@ static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
GRPC_ERROR_CANCELLED, NULL);
resource_user->reclaimers[0] = NULL;
resource_user->reclaimers[1] = NULL;
+ rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
+ rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
}
static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index fd0c7a0f9d..3c24ea9afa 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -388,7 +388,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s,
/* Try listening on IPv6 first. */
addr = &wild6;
// TODO(rjshade): Test and propagate the returned grpc_error*:
- grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd);
+ GRPC_ERROR_UNREF(grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP,
+ &dsmode, &fd));
allocated_port1 = add_socket_to_server(s, fd, addr, read_cb, orphan_cb);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
@@ -402,7 +403,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s,
}
// TODO(rjshade): Test and propagate the returned grpc_error*:
- grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd);
+ GRPC_ERROR_UNREF(grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP,
+ &dsmode, &fd));
if (fd < 0) {
gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
}
diff --git a/src/core/lib/security/transport/handshake.c b/src/core/lib/security/transport/handshake.c
deleted file mode 100644
index 9623797610..0000000000
--- a/src/core/lib/security/transport/handshake.c
+++ /dev/null
@@ -1,374 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/lib/security/transport/handshake.h"
-
-#include <stdbool.h>
-#include <string.h>
-
-#include <grpc/slice_buffer.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include "src/core/lib/iomgr/timer.h"
-#include "src/core/lib/security/context/security_context.h"
-#include "src/core/lib/security/transport/secure_endpoint.h"
-#include "src/core/lib/security/transport/tsi_error.h"
-
-#define GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE 256
-
-typedef struct {
- grpc_security_connector *connector;
- tsi_handshaker *handshaker;
- bool is_client_side;
- unsigned char *handshake_buffer;
- size_t handshake_buffer_size;
- grpc_endpoint *wrapped_endpoint;
- grpc_endpoint *secure_endpoint;
- grpc_slice_buffer left_overs;
- grpc_slice_buffer incoming;
- grpc_slice_buffer outgoing;
- grpc_security_handshake_done_cb cb;
- void *user_data;
- grpc_closure on_handshake_data_sent_to_peer;
- grpc_closure on_handshake_data_received_from_peer;
- grpc_auth_context *auth_context;
- grpc_timer timer;
- gpr_refcount refs;
-} grpc_security_handshake;
-
-static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
- void *setup,
- grpc_error *error);
-
-static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *setup,
- grpc_error *error);
-
-static void security_connector_remove_handshake(grpc_security_handshake *h) {
- GPR_ASSERT(!h->is_client_side);
- grpc_security_connector_handshake_list *node;
- grpc_security_connector_handshake_list *tmp;
- grpc_server_security_connector *sc =
- (grpc_server_security_connector *)h->connector;
- gpr_mu_lock(&sc->mu);
- node = sc->handshaking_handshakes;
- if (node && node->handshake == h) {
- sc->handshaking_handshakes = node->next;
- gpr_free(node);
- gpr_mu_unlock(&sc->mu);
- return;
- }
- while (node) {
- if (node->next->handshake == h) {
- tmp = node->next;
- node->next = node->next->next;
- gpr_free(tmp);
- gpr_mu_unlock(&sc->mu);
- return;
- }
- node = node->next;
- }
- gpr_mu_unlock(&sc->mu);
-}
-
-static void unref_handshake(grpc_security_handshake *h) {
- if (gpr_unref(&h->refs)) {
- if (h->handshaker != NULL) tsi_handshaker_destroy(h->handshaker);
- if (h->handshake_buffer != NULL) gpr_free(h->handshake_buffer);
- grpc_slice_buffer_destroy(&h->left_overs);
- grpc_slice_buffer_destroy(&h->outgoing);
- grpc_slice_buffer_destroy(&h->incoming);
- GRPC_AUTH_CONTEXT_UNREF(h->auth_context, "handshake");
- GRPC_SECURITY_CONNECTOR_UNREF(h->connector, "handshake");
- gpr_free(h);
- }
-}
-
-static void security_handshake_done(grpc_exec_ctx *exec_ctx,
- grpc_security_handshake *h,
- grpc_error *error) {
- grpc_timer_cancel(exec_ctx, &h->timer);
- if (!h->is_client_side) {
- security_connector_remove_handshake(h);
- }
- if (error == GRPC_ERROR_NONE) {
- h->cb(exec_ctx, h->user_data, GRPC_SECURITY_OK, h->secure_endpoint,
- h->auth_context);
- } else {
- const char *msg = grpc_error_string(error);
- gpr_log(GPR_DEBUG, "Security handshake failed: %s", msg);
- grpc_error_free_string(msg);
-
- if (h->secure_endpoint != NULL) {
- grpc_endpoint_shutdown(exec_ctx, h->secure_endpoint);
- grpc_endpoint_destroy(exec_ctx, h->secure_endpoint);
- } else {
- grpc_endpoint_destroy(exec_ctx, h->wrapped_endpoint);
- }
- h->cb(exec_ctx, h->user_data, GRPC_SECURITY_ERROR, NULL, NULL);
- }
- unref_handshake(h);
- GRPC_ERROR_UNREF(error);
-}
-
-static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_security_status status,
- grpc_auth_context *auth_context) {
- grpc_security_handshake *h = user_data;
- tsi_frame_protector *protector;
- tsi_result result;
- if (status != GRPC_SECURITY_OK) {
- security_handshake_done(
- exec_ctx, h,
- grpc_error_set_int(GRPC_ERROR_CREATE("Error checking peer."),
- GRPC_ERROR_INT_SECURITY_STATUS, status));
- return;
- }
- h->auth_context = GRPC_AUTH_CONTEXT_REF(auth_context, "handshake");
- result =
- tsi_handshaker_create_frame_protector(h->handshaker, NULL, &protector);
- if (result != TSI_OK) {
- security_handshake_done(
- exec_ctx, h,
- grpc_set_tsi_error_result(
- GRPC_ERROR_CREATE("Frame protector creation failed"), result));
- return;
- }
- h->secure_endpoint =
- grpc_secure_endpoint_create(protector, h->wrapped_endpoint,
- h->left_overs.slices, h->left_overs.count);
- h->left_overs.count = 0;
- h->left_overs.length = 0;
- security_handshake_done(exec_ctx, h, GRPC_ERROR_NONE);
- return;
-}
-
-static void check_peer(grpc_exec_ctx *exec_ctx, grpc_security_handshake *h) {
- tsi_peer peer;
- tsi_result result = tsi_handshaker_extract_peer(h->handshaker, &peer);
-
- if (result != TSI_OK) {
- security_handshake_done(
- exec_ctx, h, grpc_set_tsi_error_result(
- GRPC_ERROR_CREATE("Peer extraction failed"), result));
- return;
- }
- grpc_security_connector_check_peer(exec_ctx, h->connector, peer,
- on_peer_checked, h);
-}
-
-static void send_handshake_bytes_to_peer(grpc_exec_ctx *exec_ctx,
- grpc_security_handshake *h) {
- size_t offset = 0;
- tsi_result result = TSI_OK;
- grpc_slice to_send;
-
- do {
- size_t to_send_size = h->handshake_buffer_size - offset;
- result = tsi_handshaker_get_bytes_to_send_to_peer(
- h->handshaker, h->handshake_buffer + offset, &to_send_size);
- offset += to_send_size;
- if (result == TSI_INCOMPLETE_DATA) {
- h->handshake_buffer_size *= 2;
- h->handshake_buffer =
- gpr_realloc(h->handshake_buffer, h->handshake_buffer_size);
- }
- } while (result == TSI_INCOMPLETE_DATA);
-
- if (result != TSI_OK) {
- security_handshake_done(exec_ctx, h,
- grpc_set_tsi_error_result(
- GRPC_ERROR_CREATE("Handshake failed"), result));
- return;
- }
-
- to_send =
- grpc_slice_from_copied_buffer((const char *)h->handshake_buffer, offset);
- grpc_slice_buffer_reset_and_unref(&h->outgoing);
- grpc_slice_buffer_add(&h->outgoing, to_send);
- /* TODO(klempner,jboeuf): This should probably use the client setup
- deadline */
- grpc_endpoint_write(exec_ctx, h->wrapped_endpoint, &h->outgoing,
- &h->on_handshake_data_sent_to_peer);
-}
-
-static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
- void *handshake,
- grpc_error *error) {
- grpc_security_handshake *h = handshake;
- size_t consumed_slice_size = 0;
- tsi_result result = TSI_OK;
- size_t i;
- size_t num_left_overs;
- int has_left_overs_in_current_slice = 0;
-
- if (error != GRPC_ERROR_NONE) {
- security_handshake_done(
- exec_ctx, h,
- GRPC_ERROR_CREATE_REFERENCING("Handshake read failed", &error, 1));
- return;
- }
-
- for (i = 0; i < h->incoming.count; i++) {
- consumed_slice_size = GRPC_SLICE_LENGTH(h->incoming.slices[i]);
- result = tsi_handshaker_process_bytes_from_peer(
- h->handshaker, GRPC_SLICE_START_PTR(h->incoming.slices[i]),
- &consumed_slice_size);
- if (!tsi_handshaker_is_in_progress(h->handshaker)) break;
- }
-
- if (tsi_handshaker_is_in_progress(h->handshaker)) {
- /* We may need more data. */
- if (result == TSI_INCOMPLETE_DATA) {
- grpc_endpoint_read(exec_ctx, h->wrapped_endpoint, &h->incoming,
- &h->on_handshake_data_received_from_peer);
- return;
- } else {
- send_handshake_bytes_to_peer(exec_ctx, h);
- return;
- }
- }
-
- if (result != TSI_OK) {
- security_handshake_done(exec_ctx, h,
- grpc_set_tsi_error_result(
- GRPC_ERROR_CREATE("Handshake failed"), result));
- return;
- }
-
- /* Handshake is done and successful this point. */
- has_left_overs_in_current_slice =
- (consumed_slice_size < GRPC_SLICE_LENGTH(h->incoming.slices[i]));
- num_left_overs =
- (has_left_overs_in_current_slice ? 1 : 0) + h->incoming.count - i - 1;
- if (num_left_overs == 0) {
- check_peer(exec_ctx, h);
- return;
- }
-
- /* Put the leftovers in our buffer (ownership transfered). */
- if (has_left_overs_in_current_slice) {
- grpc_slice_buffer_add(
- &h->left_overs,
- grpc_slice_split_tail(&h->incoming.slices[i], consumed_slice_size));
- grpc_slice_unref(
- h->incoming.slices[i]); /* split_tail above increments refcount. */
- }
- grpc_slice_buffer_addn(
- &h->left_overs, &h->incoming.slices[i + 1],
- num_left_overs - (size_t)has_left_overs_in_current_slice);
- check_peer(exec_ctx, h);
-}
-
-/* If handshake is NULL, the handshake is done. */
-static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx,
- void *handshake, grpc_error *error) {
- grpc_security_handshake *h = handshake;
-
- /* Make sure that write is OK. */
- if (error != GRPC_ERROR_NONE) {
- if (handshake != NULL)
- security_handshake_done(
- exec_ctx, h,
- GRPC_ERROR_CREATE_REFERENCING("Handshake write failed", &error, 1));
- return;
- }
-
- /* We may be done. */
- if (tsi_handshaker_is_in_progress(h->handshaker)) {
- /* TODO(klempner,jboeuf): This should probably use the client setup
- deadline */
- grpc_endpoint_read(exec_ctx, h->wrapped_endpoint, &h->incoming,
- &h->on_handshake_data_received_from_peer);
- } else {
- check_peer(exec_ctx, h);
- }
-}
-
-static void on_timeout(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- grpc_security_handshake *h = arg;
- if (error == GRPC_ERROR_NONE) {
- grpc_endpoint_shutdown(exec_ctx, h->wrapped_endpoint);
- }
- unref_handshake(h);
-}
-
-void grpc_do_security_handshake(
- grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker,
- grpc_security_connector *connector, bool is_client_side,
- grpc_endpoint *nonsecure_endpoint, grpc_slice_buffer *read_buffer,
- gpr_timespec deadline, grpc_security_handshake_done_cb cb,
- void *user_data) {
- grpc_security_connector_handshake_list *handshake_node;
- grpc_security_handshake *h = gpr_malloc(sizeof(grpc_security_handshake));
- memset(h, 0, sizeof(grpc_security_handshake));
- h->handshaker = handshaker;
- h->connector = GRPC_SECURITY_CONNECTOR_REF(connector, "handshake");
- h->is_client_side = is_client_side;
- h->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE;
- h->handshake_buffer = gpr_malloc(h->handshake_buffer_size);
- h->wrapped_endpoint = nonsecure_endpoint;
- h->user_data = user_data;
- h->cb = cb;
- gpr_ref_init(&h->refs, 2); /* timer and handshake proper each get a ref */
- grpc_closure_init(&h->on_handshake_data_sent_to_peer,
- on_handshake_data_sent_to_peer, h);
- grpc_closure_init(&h->on_handshake_data_received_from_peer,
- on_handshake_data_received_from_peer, h);
- grpc_slice_buffer_init(&h->left_overs);
- grpc_slice_buffer_init(&h->outgoing);
- grpc_slice_buffer_init(&h->incoming);
- if (read_buffer != NULL) {
- grpc_slice_buffer_move_into(read_buffer, &h->incoming);
- gpr_free(read_buffer);
- }
- if (!is_client_side) {
- grpc_server_security_connector *server_connector =
- (grpc_server_security_connector *)connector;
- handshake_node = gpr_malloc(sizeof(grpc_security_connector_handshake_list));
- handshake_node->handshake = h;
- gpr_mu_lock(&server_connector->mu);
- handshake_node->next = server_connector->handshaking_handshakes;
- server_connector->handshaking_handshakes = handshake_node;
- gpr_mu_unlock(&server_connector->mu);
- }
- send_handshake_bytes_to_peer(exec_ctx, h);
- grpc_timer_init(exec_ctx, &h->timer,
- gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
- on_timeout, h, gpr_now(GPR_CLOCK_MONOTONIC));
-}
-
-void grpc_security_handshake_shutdown(grpc_exec_ctx *exec_ctx,
- void *handshake) {
- grpc_security_handshake *h = handshake;
- grpc_endpoint_shutdown(exec_ctx, h->wrapped_endpoint);
-}
diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c
index 0fbd63a7e1..a2e0c7c7c7 100644
--- a/src/core/lib/security/transport/security_connector.c
+++ b/src/core/lib/security/transport/security_connector.c
@@ -46,8 +46,8 @@
#include "src/core/lib/iomgr/load_file.h"
#include "src/core/lib/security/context/security_context.h"
#include "src/core/lib/security/credentials/credentials.h"
-#include "src/core/lib/security/transport/handshake.h"
#include "src/core/lib/security/transport/secure_endpoint.h"
+#include "src/core/lib/security/transport/security_handshaker.h"
#include "src/core/lib/support/env.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/tsi/fake_transport_security.h"
@@ -111,58 +111,34 @@ const tsi_peer_property *tsi_peer_get_property_by_name(const tsi_peer *peer,
return NULL;
}
-void grpc_server_security_connector_shutdown(
- grpc_exec_ctx *exec_ctx, grpc_server_security_connector *connector) {
- grpc_security_connector_handshake_list *tmp;
- gpr_mu_lock(&connector->mu);
- while (connector->handshaking_handshakes) {
- tmp = connector->handshaking_handshakes;
- grpc_security_handshake_shutdown(
- exec_ctx, connector->handshaking_handshakes->handshake);
- connector->handshaking_handshakes = tmp->next;
- gpr_free(tmp);
+void grpc_channel_security_connector_create_handshakers(
+ grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *connector,
+ grpc_handshake_manager *handshake_mgr) {
+ if (connector != NULL) {
+ connector->create_handshakers(exec_ctx, connector, handshake_mgr);
}
- gpr_mu_unlock(&connector->mu);
}
-void grpc_channel_security_connector_do_handshake(
- grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc,
- grpc_endpoint *nonsecure_endpoint, grpc_slice_buffer *read_buffer,
- gpr_timespec deadline, grpc_security_handshake_done_cb cb,
- void *user_data) {
- if (sc == NULL || nonsecure_endpoint == NULL) {
- gpr_free(read_buffer);
- cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL);
- } else {
- sc->do_handshake(exec_ctx, sc, nonsecure_endpoint, read_buffer, deadline,
- cb, user_data);
- }
-}
-
-void grpc_server_security_connector_do_handshake(
- grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc,
- grpc_tcp_server_acceptor *acceptor, grpc_endpoint *nonsecure_endpoint,
- grpc_slice_buffer *read_buffer, gpr_timespec deadline,
- grpc_security_handshake_done_cb cb, void *user_data) {
- if (sc == NULL || nonsecure_endpoint == NULL) {
- gpr_free(read_buffer);
- cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL);
- } else {
- sc->do_handshake(exec_ctx, sc, acceptor, nonsecure_endpoint, read_buffer,
- deadline, cb, user_data);
+void grpc_server_security_connector_create_handshakers(
+ grpc_exec_ctx *exec_ctx, grpc_server_security_connector *connector,
+ grpc_handshake_manager *handshake_mgr) {
+ if (connector != NULL) {
+ connector->create_handshakers(exec_ctx, connector, handshake_mgr);
}
}
void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc,
tsi_peer peer,
- grpc_security_peer_check_cb cb,
- void *user_data) {
+ grpc_auth_context **auth_context,
+ grpc_closure *on_peer_checked) {
if (sc == NULL) {
- cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL);
+ grpc_exec_ctx_sched(
+ exec_ctx, on_peer_checked,
+ GRPC_ERROR_CREATE("cannot check peer -- no security connector"), NULL);
tsi_peer_destruct(&peer);
} else {
- sc->vtable->check_peer(exec_ctx, sc, peer, cb, user_data);
+ sc->vtable->check_peer(exec_ctx, sc, peer, auth_context, on_peer_checked);
}
}
@@ -262,45 +238,41 @@ static void fake_channel_destroy(grpc_security_connector *sc) {
gpr_free(sc);
}
-static void fake_server_destroy(grpc_security_connector *sc) {
- grpc_server_security_connector *c = (grpc_server_security_connector *)sc;
- gpr_mu_destroy(&c->mu);
- gpr_free(sc);
-}
+static void fake_server_destroy(grpc_security_connector *sc) { gpr_free(sc); }
static void fake_check_peer(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc, tsi_peer peer,
- grpc_security_peer_check_cb cb, void *user_data) {
+ grpc_auth_context **auth_context,
+ grpc_closure *on_peer_checked) {
const char *prop_name;
- grpc_security_status status = GRPC_SECURITY_OK;
- grpc_auth_context *auth_context = NULL;
+ grpc_error *error = GRPC_ERROR_NONE;
+ *auth_context = NULL;
if (peer.property_count != 1) {
- gpr_log(GPR_ERROR, "Fake peers should only have 1 property.");
- status = GRPC_SECURITY_ERROR;
+ error = GRPC_ERROR_CREATE("Fake peers should only have 1 property.");
goto end;
}
prop_name = peer.properties[0].name;
if (prop_name == NULL ||
strcmp(prop_name, TSI_CERTIFICATE_TYPE_PEER_PROPERTY)) {
- gpr_log(GPR_ERROR, "Unexpected property in fake peer: %s.",
- prop_name == NULL ? "<EMPTY>" : prop_name);
- status = GRPC_SECURITY_ERROR;
+ char *msg;
+ gpr_asprintf(&msg, "Unexpected property in fake peer: %s.",
+ prop_name == NULL ? "<EMPTY>" : prop_name);
+ error = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
goto end;
}
if (strncmp(peer.properties[0].value.data, TSI_FAKE_CERTIFICATE_TYPE,
peer.properties[0].value.length)) {
- gpr_log(GPR_ERROR, "Invalid value for cert type property.");
- status = GRPC_SECURITY_ERROR;
+ error = GRPC_ERROR_CREATE("Invalid value for cert type property.");
goto end;
}
- auth_context = grpc_auth_context_create(NULL);
+ *auth_context = grpc_auth_context_create(NULL);
grpc_auth_context_add_cstring_property(
- auth_context, GRPC_TRANSPORT_SECURITY_TYPE_PROPERTY_NAME,
+ *auth_context, GRPC_TRANSPORT_SECURITY_TYPE_PROPERTY_NAME,
GRPC_FAKE_TRANSPORT_SECURITY_TYPE);
end:
- cb(exec_ctx, user_data, status, auth_context);
- grpc_auth_context_unref(auth_context);
+ grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL);
tsi_peer_destruct(&peer);
}
@@ -313,26 +285,20 @@ static void fake_channel_check_call_host(grpc_exec_ctx *exec_ctx,
cb(exec_ctx, user_data, GRPC_SECURITY_OK);
}
-static void fake_channel_do_handshake(grpc_exec_ctx *exec_ctx,
- grpc_channel_security_connector *sc,
- grpc_endpoint *nonsecure_endpoint,
- grpc_slice_buffer *read_buffer,
- gpr_timespec deadline,
- grpc_security_handshake_done_cb cb,
- void *user_data) {
- grpc_do_security_handshake(exec_ctx, tsi_create_fake_handshaker(1), &sc->base,
- true, nonsecure_endpoint, read_buffer, deadline,
- cb, user_data);
+static void fake_channel_create_handshakers(
+ grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc,
+ grpc_handshake_manager *handshake_mgr) {
+ grpc_security_create_handshakers(
+ exec_ctx, tsi_create_fake_handshaker(true /* is_client */), &sc->base,
+ handshake_mgr);
}
-static void fake_server_do_handshake(
+static void fake_server_create_handshakers(
grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc,
- grpc_tcp_server_acceptor *acceptor, grpc_endpoint *nonsecure_endpoint,
- grpc_slice_buffer *read_buffer, gpr_timespec deadline,
- grpc_security_handshake_done_cb cb, void *user_data) {
- grpc_do_security_handshake(exec_ctx, tsi_create_fake_handshaker(0), &sc->base,
- false, nonsecure_endpoint, read_buffer, deadline,
- cb, user_data);
+ grpc_handshake_manager *handshake_mgr) {
+ grpc_security_create_handshakers(
+ exec_ctx, tsi_create_fake_handshaker(false /* is_client */), &sc->base,
+ handshake_mgr);
}
static grpc_security_connector_vtable fake_channel_vtable = {
@@ -350,7 +316,7 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create(
c->base.vtable = &fake_channel_vtable;
c->request_metadata_creds = grpc_call_credentials_ref(request_metadata_creds);
c->check_call_host = fake_channel_check_call_host;
- c->do_handshake = fake_channel_do_handshake;
+ c->create_handshakers = fake_channel_create_handshakers;
return c;
}
@@ -362,8 +328,7 @@ grpc_server_security_connector *grpc_fake_server_security_connector_create(
gpr_ref_init(&c->base.refcount, 1);
c->base.vtable = &fake_server_vtable;
c->base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME;
- c->do_handshake = fake_server_do_handshake;
- gpr_mu_init(&c->mu);
+ c->create_handshakers = fake_server_create_handshakers;
return c;
}
@@ -396,11 +361,9 @@ static void ssl_channel_destroy(grpc_security_connector *sc) {
static void ssl_server_destroy(grpc_security_connector *sc) {
grpc_ssl_server_security_connector *c =
(grpc_ssl_server_security_connector *)sc;
-
if (c->handshaker_factory != NULL) {
tsi_ssl_handshaker_factory_destroy(c->handshaker_factory);
}
- gpr_mu_destroy(&c->base.mu);
gpr_free(sc);
}
@@ -419,49 +382,33 @@ static grpc_security_status ssl_create_handshaker(
return GRPC_SECURITY_OK;
}
-static void ssl_channel_do_handshake(grpc_exec_ctx *exec_ctx,
- grpc_channel_security_connector *sc,
- grpc_endpoint *nonsecure_endpoint,
- grpc_slice_buffer *read_buffer,
- gpr_timespec deadline,
- grpc_security_handshake_done_cb cb,
- void *user_data) {
+static void ssl_channel_create_handshakers(
+ grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc,
+ grpc_handshake_manager *handshake_mgr) {
grpc_ssl_channel_security_connector *c =
(grpc_ssl_channel_security_connector *)sc;
- tsi_handshaker *handshaker;
- grpc_security_status status = ssl_create_handshaker(
- c->handshaker_factory, true,
- c->overridden_target_name != NULL ? c->overridden_target_name
- : c->target_name,
- &handshaker);
- if (status != GRPC_SECURITY_OK) {
- gpr_free(read_buffer);
- cb(exec_ctx, user_data, status, NULL, NULL);
- } else {
- grpc_do_security_handshake(exec_ctx, handshaker, &sc->base, true,
- nonsecure_endpoint, read_buffer, deadline, cb,
- user_data);
- }
-}
-
-static void ssl_server_do_handshake(
+ // Instantiate TSI handshaker.
+ tsi_handshaker *tsi_hs = NULL;
+ ssl_create_handshaker(c->handshaker_factory, true /* is_client */,
+ c->overridden_target_name != NULL
+ ? c->overridden_target_name
+ : c->target_name,
+ &tsi_hs);
+ // Create handshakers.
+ grpc_security_create_handshakers(exec_ctx, tsi_hs, &sc->base, handshake_mgr);
+}
+
+static void ssl_server_create_handshakers(
grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc,
- grpc_tcp_server_acceptor *acceptor, grpc_endpoint *nonsecure_endpoint,
- grpc_slice_buffer *read_buffer, gpr_timespec deadline,
- grpc_security_handshake_done_cb cb, void *user_data) {
+ grpc_handshake_manager *handshake_mgr) {
grpc_ssl_server_security_connector *c =
(grpc_ssl_server_security_connector *)sc;
- tsi_handshaker *handshaker;
- grpc_security_status status =
- ssl_create_handshaker(c->handshaker_factory, false, NULL, &handshaker);
- if (status != GRPC_SECURITY_OK) {
- gpr_free(read_buffer);
- cb(exec_ctx, user_data, status, NULL, NULL);
- } else {
- grpc_do_security_handshake(exec_ctx, handshaker, &sc->base, false,
- nonsecure_endpoint, read_buffer, deadline, cb,
- user_data);
- }
+ // Instantiate TSI handshaker.
+ tsi_handshaker *tsi_hs = NULL;
+ ssl_create_handshaker(c->handshaker_factory, false /* is_client */,
+ NULL /* peer_name */, &tsi_hs);
+ // Create handshakers.
+ grpc_security_create_handshakers(exec_ctx, tsi_hs, &sc->base, handshake_mgr);
}
static int ssl_host_matches_name(const tsi_peer *peer, const char *peer_name) {
@@ -518,57 +465,53 @@ grpc_auth_context *tsi_ssl_peer_to_auth_context(const tsi_peer *peer) {
return ctx;
}
-static grpc_security_status ssl_check_peer(grpc_security_connector *sc,
- const char *peer_name,
- const tsi_peer *peer,
- grpc_auth_context **auth_context) {
+static grpc_error *ssl_check_peer(grpc_security_connector *sc,
+ const char *peer_name, const tsi_peer *peer,
+ grpc_auth_context **auth_context) {
/* Check the ALPN. */
const tsi_peer_property *p =
tsi_peer_get_property_by_name(peer, TSI_SSL_ALPN_SELECTED_PROTOCOL);
if (p == NULL) {
- gpr_log(GPR_ERROR, "Missing selected ALPN property.");
- return GRPC_SECURITY_ERROR;
+ return GRPC_ERROR_CREATE(
+ "Cannot check peer: missing selected ALPN property.");
}
if (!grpc_chttp2_is_alpn_version_supported(p->value.data, p->value.length)) {
- gpr_log(GPR_ERROR, "Invalid ALPN value.");
- return GRPC_SECURITY_ERROR;
+ return GRPC_ERROR_CREATE("Cannot check peer: invalid ALPN value.");
}
/* Check the peer name if specified. */
if (peer_name != NULL && !ssl_host_matches_name(peer, peer_name)) {
- gpr_log(GPR_ERROR, "Peer name %s is not in peer certificate", peer_name);
- return GRPC_SECURITY_ERROR;
+ char *msg;
+ gpr_asprintf(&msg, "Peer name %s is not in peer certificate", peer_name);
+ grpc_error *error = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return error;
}
*auth_context = tsi_ssl_peer_to_auth_context(peer);
- return GRPC_SECURITY_OK;
+ return GRPC_ERROR_NONE;
}
static void ssl_channel_check_peer(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc, tsi_peer peer,
- grpc_security_peer_check_cb cb,
- void *user_data) {
+ grpc_auth_context **auth_context,
+ grpc_closure *on_peer_checked) {
grpc_ssl_channel_security_connector *c =
(grpc_ssl_channel_security_connector *)sc;
- grpc_security_status status;
- grpc_auth_context *auth_context = NULL;
- status = ssl_check_peer(sc, c->overridden_target_name != NULL
- ? c->overridden_target_name
- : c->target_name,
- &peer, &auth_context);
- cb(exec_ctx, user_data, status, auth_context);
- grpc_auth_context_unref(auth_context);
+ grpc_error *error = ssl_check_peer(sc, c->overridden_target_name != NULL
+ ? c->overridden_target_name
+ : c->target_name,
+ &peer, auth_context);
+ grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL);
tsi_peer_destruct(&peer);
}
static void ssl_server_check_peer(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc, tsi_peer peer,
- grpc_security_peer_check_cb cb,
- void *user_data) {
- grpc_auth_context *auth_context = NULL;
- grpc_security_status status = ssl_check_peer(sc, NULL, &peer, &auth_context);
+ grpc_auth_context **auth_context,
+ grpc_closure *on_peer_checked) {
+ grpc_error *error = ssl_check_peer(sc, NULL, &peer, auth_context);
tsi_peer_destruct(&peer);
- cb(exec_ctx, user_data, status, auth_context);
- grpc_auth_context_unref(auth_context);
+ grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL);
}
static void add_shallow_auth_property_to_peer(tsi_peer *peer,
@@ -765,7 +708,7 @@ grpc_security_status grpc_ssl_channel_security_connector_create(
c->base.request_metadata_creds =
grpc_call_credentials_ref(request_metadata_creds);
c->base.check_call_host = ssl_channel_check_call_host;
- c->base.do_handshake = ssl_channel_do_handshake;
+ c->base.create_handshakers = ssl_channel_create_handshakers;
gpr_split_host_port(target_name, &c->target_name, &port);
gpr_free(port);
if (overridden_target_name != NULL) {
@@ -840,8 +783,7 @@ grpc_security_status grpc_ssl_server_security_connector_create(
*sc = NULL;
goto error;
}
- gpr_mu_init(&c->base.mu);
- c->base.do_handshake = ssl_server_do_handshake;
+ c->base.create_handshakers = ssl_server_create_handshakers;
*sc = &c->base;
gpr_free((void *)alpn_protocol_strings);
gpr_free(alpn_protocol_string_lengths);
diff --git a/src/core/lib/security/transport/security_connector.h b/src/core/lib/security/transport/security_connector.h
index dc02692b01..696db0e02e 100644
--- a/src/core/lib/security/transport/security_connector.h
+++ b/src/core/lib/security/transport/security_connector.h
@@ -35,6 +35,8 @@
#define GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_CONNECTOR_H
#include <grpc/grpc_security.h>
+
+#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/tsi/transport_security_interface.h"
@@ -57,21 +59,11 @@ typedef struct grpc_security_connector grpc_security_connector;
#define GRPC_SECURITY_CONNECTOR_ARG "grpc.security_connector"
-typedef void (*grpc_security_peer_check_cb)(grpc_exec_ctx *exec_ctx,
- void *user_data,
- grpc_security_status status,
- grpc_auth_context *auth_context);
-
-/* Ownership of the secure_endpoint is transfered. */
-typedef void (*grpc_security_handshake_done_cb)(
- grpc_exec_ctx *exec_ctx, void *user_data, grpc_security_status status,
- grpc_endpoint *secure_endpoint, grpc_auth_context *auth_context);
-
typedef struct {
void (*destroy)(grpc_security_connector *sc);
void (*check_peer)(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc,
- tsi_peer peer, grpc_security_peer_check_cb cb,
- void *user_data);
+ tsi_peer peer, grpc_auth_context **auth_context,
+ grpc_closure *on_peer_checked);
} grpc_security_connector_vtable;
typedef struct grpc_security_connector_handshake_list {
@@ -106,12 +98,12 @@ void grpc_security_connector_unref(grpc_security_connector *policy);
#endif
/* Check the peer. Callee takes ownership of the peer object.
- The callback will include the resulting auth_context. */
+ Sets *auth_context and invokes on_peer_checked when done. */
void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc,
tsi_peer peer,
- grpc_security_peer_check_cb cb,
- void *user_data);
+ grpc_auth_context **auth_context,
+ grpc_closure *on_peer_checked);
/* Util to encapsulate the connector in a channel arg. */
grpc_arg grpc_security_connector_to_arg(grpc_security_connector *sc);
@@ -141,11 +133,9 @@ struct grpc_channel_security_connector {
grpc_channel_security_connector *sc, const char *host,
grpc_auth_context *auth_context,
grpc_security_call_host_check_cb cb, void *user_data);
- void (*do_handshake)(grpc_exec_ctx *exec_ctx,
- grpc_channel_security_connector *sc,
- grpc_endpoint *nonsecure_endpoint,
- grpc_slice_buffer *read_buffer, gpr_timespec deadline,
- grpc_security_handshake_done_cb cb, void *user_data);
+ void (*create_handshakers)(grpc_exec_ctx *exec_ctx,
+ grpc_channel_security_connector *sc,
+ grpc_handshake_manager *handshake_mgr);
};
/* Checks that the host that will be set for a call is acceptable. */
@@ -154,11 +144,10 @@ void grpc_channel_security_connector_check_call_host(
const char *host, grpc_auth_context *auth_context,
grpc_security_call_host_check_cb cb, void *user_data);
-/* Handshake. */
-void grpc_channel_security_connector_do_handshake(
+/* Registers handshakers with \a handshake_mgr. */
+void grpc_channel_security_connector_create_handshakers(
grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *connector,
- grpc_endpoint *nonsecure_endpoint, grpc_slice_buffer *read_buffer,
- gpr_timespec deadline, grpc_security_handshake_done_cb cb, void *user_data);
+ grpc_handshake_manager *handshake_mgr);
/* --- server_security_connector object. ---
@@ -169,25 +158,14 @@ typedef struct grpc_server_security_connector grpc_server_security_connector;
struct grpc_server_security_connector {
grpc_security_connector base;
- gpr_mu mu;
- grpc_security_connector_handshake_list *handshaking_handshakes;
- const grpc_channel_args *channel_args;
- void (*do_handshake)(grpc_exec_ctx *exec_ctx,
- grpc_server_security_connector *sc,
- grpc_tcp_server_acceptor *acceptor,
- grpc_endpoint *nonsecure_endpoint,
- grpc_slice_buffer *read_buffer, gpr_timespec deadline,
- grpc_security_handshake_done_cb cb, void *user_data);
+ void (*create_handshakers)(grpc_exec_ctx *exec_ctx,
+ grpc_server_security_connector *sc,
+ grpc_handshake_manager *handshake_mgr);
};
-void grpc_server_security_connector_do_handshake(
+void grpc_server_security_connector_create_handshakers(
grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc,
- grpc_tcp_server_acceptor *acceptor, grpc_endpoint *nonsecure_endpoint,
- grpc_slice_buffer *read_buffer, gpr_timespec deadline,
- grpc_security_handshake_done_cb cb, void *user_data);
-
-void grpc_server_security_connector_shutdown(
- grpc_exec_ctx *exec_ctx, grpc_server_security_connector *connector);
+ grpc_handshake_manager *handshake_mgr);
/* --- Creation security connectors. --- */
diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c
new file mode 100644
index 0000000000..fc01bec2f2
--- /dev/null
+++ b/src/core/lib/security/transport/security_handshaker.c
@@ -0,0 +1,450 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/security/transport/security_handshaker.h"
+
+#include <stdbool.h>
+#include <string.h>
+
+#include <grpc/slice_buffer.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/handshaker.h"
+#include "src/core/lib/security/context/security_context.h"
+#include "src/core/lib/security/transport/secure_endpoint.h"
+#include "src/core/lib/security/transport/tsi_error.h"
+
+#define GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE 256
+
+typedef struct {
+ grpc_handshaker base;
+
+ // State set at creation time.
+ tsi_handshaker *handshaker;
+ grpc_security_connector *connector;
+
+ gpr_mu mu;
+ gpr_refcount refs;
+
+ bool shutdown;
+ // Endpoint and read buffer to destroy after a shutdown.
+ grpc_endpoint *endpoint_to_destroy;
+ grpc_slice_buffer *read_buffer_to_destroy;
+
+ // State saved while performing the handshake.
+ grpc_handshaker_args *args;
+ grpc_closure *on_handshake_done;
+
+ unsigned char *handshake_buffer;
+ size_t handshake_buffer_size;
+ grpc_slice_buffer left_overs;
+ grpc_slice_buffer outgoing;
+ grpc_closure on_handshake_data_sent_to_peer;
+ grpc_closure on_handshake_data_received_from_peer;
+ grpc_closure on_peer_checked;
+ grpc_auth_context *auth_context;
+} security_handshaker;
+
+static void security_handshaker_unref(grpc_exec_ctx *exec_ctx,
+ security_handshaker *h) {
+ if (gpr_unref(&h->refs)) {
+ gpr_mu_destroy(&h->mu);
+ tsi_handshaker_destroy(h->handshaker);
+ if (h->endpoint_to_destroy != NULL) {
+ grpc_endpoint_destroy(exec_ctx, h->endpoint_to_destroy);
+ }
+ if (h->read_buffer_to_destroy != NULL) {
+ grpc_slice_buffer_destroy(h->read_buffer_to_destroy);
+ gpr_free(h->read_buffer_to_destroy);
+ }
+ gpr_free(h->handshake_buffer);
+ grpc_slice_buffer_destroy(&h->left_overs);
+ grpc_slice_buffer_destroy(&h->outgoing);
+ GRPC_AUTH_CONTEXT_UNREF(h->auth_context, "handshake");
+ GRPC_SECURITY_CONNECTOR_UNREF(h->connector, "handshake");
+ gpr_free(h);
+ }
+}
+
+// Set args fields to NULL, saving the endpoint and read buffer for
+// later destruction.
+static void cleanup_args_for_failure_locked(security_handshaker *h) {
+ h->endpoint_to_destroy = h->args->endpoint;
+ h->args->endpoint = NULL;
+ h->read_buffer_to_destroy = h->args->read_buffer;
+ h->args->read_buffer = NULL;
+ grpc_channel_args_destroy(h->args->args);
+ h->args->args = NULL;
+}
+
+// If the handshake failed or we're shutting down, clean up and invoke the
+// callback with the error.
+static void security_handshake_failed_locked(grpc_exec_ctx *exec_ctx,
+ security_handshaker *h,
+ grpc_error *error) {
+ if (error == GRPC_ERROR_NONE) {
+ // If we were shut down after the handshake succeeded but before an
+ // endpoint callback was invoked, we need to generate our own error.
+ error = GRPC_ERROR_CREATE("Handshaker shutdown");
+ }
+ const char *msg = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "Security handshake failed: %s", msg);
+ grpc_error_free_string(msg);
+ if (!h->shutdown) {
+ // TODO(ctiller): It is currently necessary to shutdown endpoints
+ // before destroying them, even if we know that there are no
+ // pending read/write callbacks. This should be fixed, at which
+ // point this can be removed.
+ grpc_endpoint_shutdown(exec_ctx, h->args->endpoint);
+ // Not shutting down, so the write failed. Clean up before
+ // invoking the callback.
+ cleanup_args_for_failure_locked(h);
+ }
+ // Invoke callback.
+ grpc_exec_ctx_sched(exec_ctx, h->on_handshake_done, error, NULL);
+}
+
+static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ security_handshaker *h = arg;
+ gpr_mu_lock(&h->mu);
+ if (error != GRPC_ERROR_NONE || h->shutdown) {
+ security_handshake_failed_locked(exec_ctx, h, GRPC_ERROR_REF(error));
+ goto done;
+ }
+ // Get frame protector.
+ tsi_frame_protector *protector;
+ tsi_result result =
+ tsi_handshaker_create_frame_protector(h->handshaker, NULL, &protector);
+ if (result != TSI_OK) {
+ error = grpc_set_tsi_error_result(
+ GRPC_ERROR_CREATE("Frame protector creation failed"), result);
+ security_handshake_failed_locked(exec_ctx, h, error);
+ goto done;
+ }
+ // Success.
+ // Create secure endpoint.
+ h->args->endpoint = grpc_secure_endpoint_create(
+ protector, h->args->endpoint, h->left_overs.slices, h->left_overs.count);
+ h->left_overs.count = 0;
+ h->left_overs.length = 0;
+ // Clear out the read buffer before it gets passed to the transport,
+ // since any excess bytes were already copied to h->left_overs.
+ grpc_slice_buffer_reset_and_unref(h->args->read_buffer);
+ // Add auth context to channel args.
+ grpc_arg auth_context_arg = grpc_auth_context_to_arg(h->auth_context);
+ grpc_channel_args *tmp_args = h->args->args;
+ h->args->args =
+ grpc_channel_args_copy_and_add(tmp_args, &auth_context_arg, 1);
+ grpc_channel_args_destroy(tmp_args);
+ // Invoke callback.
+ grpc_exec_ctx_sched(exec_ctx, h->on_handshake_done, GRPC_ERROR_NONE, NULL);
+ // Set shutdown to true so that subsequent calls to
+ // security_handshaker_shutdown() do nothing.
+ h->shutdown = true;
+done:
+ gpr_mu_unlock(&h->mu);
+ security_handshaker_unref(exec_ctx, h);
+}
+
+static grpc_error *check_peer_locked(grpc_exec_ctx *exec_ctx,
+ security_handshaker *h) {
+ tsi_peer peer;
+ tsi_result result = tsi_handshaker_extract_peer(h->handshaker, &peer);
+ if (result != TSI_OK) {
+ return grpc_set_tsi_error_result(
+ GRPC_ERROR_CREATE("Peer extraction failed"), result);
+ }
+ grpc_security_connector_check_peer(exec_ctx, h->connector, peer,
+ &h->auth_context, &h->on_peer_checked);
+ return GRPC_ERROR_NONE;
+}
+
+static grpc_error *send_handshake_bytes_to_peer_locked(grpc_exec_ctx *exec_ctx,
+ security_handshaker *h) {
+ // Get data to send.
+ tsi_result result = TSI_OK;
+ size_t offset = 0;
+ do {
+ size_t to_send_size = h->handshake_buffer_size - offset;
+ result = tsi_handshaker_get_bytes_to_send_to_peer(
+ h->handshaker, h->handshake_buffer + offset, &to_send_size);
+ offset += to_send_size;
+ if (result == TSI_INCOMPLETE_DATA) {
+ h->handshake_buffer_size *= 2;
+ h->handshake_buffer =
+ gpr_realloc(h->handshake_buffer, h->handshake_buffer_size);
+ }
+ } while (result == TSI_INCOMPLETE_DATA);
+ if (result != TSI_OK) {
+ return grpc_set_tsi_error_result(GRPC_ERROR_CREATE("Handshake failed"),
+ result);
+ }
+ // Send data.
+ grpc_slice to_send =
+ grpc_slice_from_copied_buffer((const char *)h->handshake_buffer, offset);
+ grpc_slice_buffer_reset_and_unref(&h->outgoing);
+ grpc_slice_buffer_add(&h->outgoing, to_send);
+ grpc_endpoint_write(exec_ctx, h->args->endpoint, &h->outgoing,
+ &h->on_handshake_data_sent_to_peer);
+ return GRPC_ERROR_NONE;
+}
+
+static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
+ void *arg, grpc_error *error) {
+ security_handshaker *h = arg;
+ gpr_mu_lock(&h->mu);
+ if (error != GRPC_ERROR_NONE || h->shutdown) {
+ security_handshake_failed_locked(
+ exec_ctx, h,
+ GRPC_ERROR_CREATE_REFERENCING("Handshake read failed", &error, 1));
+ gpr_mu_unlock(&h->mu);
+ security_handshaker_unref(exec_ctx, h);
+ return;
+ }
+ // Process received data.
+ tsi_result result = TSI_OK;
+ size_t consumed_slice_size = 0;
+ size_t i;
+ for (i = 0; i < h->args->read_buffer->count; i++) {
+ consumed_slice_size = GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i]);
+ result = tsi_handshaker_process_bytes_from_peer(
+ h->handshaker, GRPC_SLICE_START_PTR(h->args->read_buffer->slices[i]),
+ &consumed_slice_size);
+ if (!tsi_handshaker_is_in_progress(h->handshaker)) break;
+ }
+ if (tsi_handshaker_is_in_progress(h->handshaker)) {
+ /* We may need more data. */
+ if (result == TSI_INCOMPLETE_DATA) {
+ grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer,
+ &h->on_handshake_data_received_from_peer);
+ goto done;
+ } else {
+ error = send_handshake_bytes_to_peer_locked(exec_ctx, h);
+ if (error != GRPC_ERROR_NONE) {
+ security_handshake_failed_locked(exec_ctx, h, error);
+ gpr_mu_unlock(&h->mu);
+ security_handshaker_unref(exec_ctx, h);
+ return;
+ }
+ goto done;
+ }
+ }
+ if (result != TSI_OK) {
+ security_handshake_failed_locked(
+ exec_ctx, h, grpc_set_tsi_error_result(
+ GRPC_ERROR_CREATE("Handshake failed"), result));
+ gpr_mu_unlock(&h->mu);
+ security_handshaker_unref(exec_ctx, h);
+ return;
+ }
+ /* Handshake is done and successful this point. */
+ bool has_left_overs_in_current_slice =
+ (consumed_slice_size <
+ GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i]));
+ size_t num_left_overs = (has_left_overs_in_current_slice ? 1 : 0) +
+ h->args->read_buffer->count - i - 1;
+ if (num_left_overs > 0) {
+ /* Put the leftovers in our buffer (ownership transfered). */
+ if (has_left_overs_in_current_slice) {
+ grpc_slice_buffer_add(
+ &h->left_overs,
+ grpc_slice_split_tail(&h->args->read_buffer->slices[i],
+ consumed_slice_size));
+ /* split_tail above increments refcount. */
+ grpc_slice_unref(h->args->read_buffer->slices[i]);
+ }
+ grpc_slice_buffer_addn(
+ &h->left_overs, &h->args->read_buffer->slices[i + 1],
+ num_left_overs - (size_t)has_left_overs_in_current_slice);
+ }
+ // Check peer.
+ error = check_peer_locked(exec_ctx, h);
+ if (error != GRPC_ERROR_NONE) {
+ security_handshake_failed_locked(exec_ctx, h, error);
+ gpr_mu_unlock(&h->mu);
+ security_handshaker_unref(exec_ctx, h);
+ return;
+ }
+done:
+ gpr_mu_unlock(&h->mu);
+}
+
+static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ security_handshaker *h = arg;
+ gpr_mu_lock(&h->mu);
+ if (error != GRPC_ERROR_NONE || h->shutdown) {
+ security_handshake_failed_locked(
+ exec_ctx, h,
+ GRPC_ERROR_CREATE_REFERENCING("Handshake write failed", &error, 1));
+ gpr_mu_unlock(&h->mu);
+ security_handshaker_unref(exec_ctx, h);
+ return;
+ }
+ /* We may be done. */
+ if (tsi_handshaker_is_in_progress(h->handshaker)) {
+ grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer,
+ &h->on_handshake_data_received_from_peer);
+ } else {
+ error = check_peer_locked(exec_ctx, h);
+ if (error != GRPC_ERROR_NONE) {
+ security_handshake_failed_locked(exec_ctx, h, error);
+ gpr_mu_unlock(&h->mu);
+ security_handshaker_unref(exec_ctx, h);
+ return;
+ }
+ }
+ gpr_mu_unlock(&h->mu);
+}
+
+//
+// public handshaker API
+//
+
+static void security_handshaker_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_handshaker *handshaker) {
+ security_handshaker *h = (security_handshaker *)handshaker;
+ security_handshaker_unref(exec_ctx, h);
+}
+
+static void security_handshaker_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_handshaker *handshaker) {
+ security_handshaker *h = (security_handshaker *)handshaker;
+ gpr_mu_lock(&h->mu);
+ if (!h->shutdown) {
+ h->shutdown = true;
+ grpc_endpoint_shutdown(exec_ctx, h->args->endpoint);
+ cleanup_args_for_failure_locked(h);
+ }
+ gpr_mu_unlock(&h->mu);
+}
+
+static void security_handshaker_do_handshake(grpc_exec_ctx *exec_ctx,
+ grpc_handshaker *handshaker,
+ grpc_tcp_server_acceptor *acceptor,
+ grpc_closure *on_handshake_done,
+ grpc_handshaker_args *args) {
+ security_handshaker *h = (security_handshaker *)handshaker;
+ gpr_mu_lock(&h->mu);
+ h->args = args;
+ h->on_handshake_done = on_handshake_done;
+ gpr_ref(&h->refs);
+ grpc_error *error = send_handshake_bytes_to_peer_locked(exec_ctx, h);
+ if (error != GRPC_ERROR_NONE) {
+ security_handshake_failed_locked(exec_ctx, h, error);
+ gpr_mu_unlock(&h->mu);
+ security_handshaker_unref(exec_ctx, h);
+ return;
+ }
+ gpr_mu_unlock(&h->mu);
+}
+
+static const grpc_handshaker_vtable security_handshaker_vtable = {
+ security_handshaker_destroy, security_handshaker_shutdown,
+ security_handshaker_do_handshake};
+
+static grpc_handshaker *security_handshaker_create(
+ grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker,
+ grpc_security_connector *connector) {
+ security_handshaker *h = gpr_malloc(sizeof(security_handshaker));
+ memset(h, 0, sizeof(security_handshaker));
+ grpc_handshaker_init(&security_handshaker_vtable, &h->base);
+ h->handshaker = handshaker;
+ h->connector = GRPC_SECURITY_CONNECTOR_REF(connector, "handshake");
+ gpr_mu_init(&h->mu);
+ gpr_ref_init(&h->refs, 1);
+ h->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE;
+ h->handshake_buffer = gpr_malloc(h->handshake_buffer_size);
+ grpc_closure_init(&h->on_handshake_data_sent_to_peer,
+ on_handshake_data_sent_to_peer, h);
+ grpc_closure_init(&h->on_handshake_data_received_from_peer,
+ on_handshake_data_received_from_peer, h);
+ grpc_closure_init(&h->on_peer_checked, on_peer_checked, h);
+ grpc_slice_buffer_init(&h->left_overs);
+ grpc_slice_buffer_init(&h->outgoing);
+ return &h->base;
+}
+
+//
+// fail_handshaker
+//
+
+static void fail_handshaker_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_handshaker *handshaker) {
+ gpr_free(handshaker);
+}
+
+static void fail_handshaker_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_handshaker *handshaker) {}
+
+static void fail_handshaker_do_handshake(grpc_exec_ctx *exec_ctx,
+ grpc_handshaker *handshaker,
+ grpc_tcp_server_acceptor *acceptor,
+ grpc_closure *on_handshake_done,
+ grpc_handshaker_args *args) {
+ grpc_exec_ctx_sched(exec_ctx, on_handshake_done,
+ GRPC_ERROR_CREATE("Failed to create security handshaker"),
+ NULL);
+}
+
+static const grpc_handshaker_vtable fail_handshaker_vtable = {
+ fail_handshaker_destroy, fail_handshaker_shutdown,
+ fail_handshaker_do_handshake};
+
+static grpc_handshaker *fail_handshaker_create() {
+ grpc_handshaker *h = gpr_malloc(sizeof(*h));
+ grpc_handshaker_init(&fail_handshaker_vtable, h);
+ return h;
+}
+
+//
+// exported functions
+//
+
+void grpc_security_create_handshakers(grpc_exec_ctx *exec_ctx,
+ tsi_handshaker *handshaker,
+ grpc_security_connector *connector,
+ grpc_handshake_manager *handshake_mgr) {
+ // If no TSI handshaker was created, add a handshaker that always fails.
+ // Otherwise, add a real security handshaker.
+ if (handshaker == NULL) {
+ grpc_handshake_manager_add(handshake_mgr, fail_handshaker_create());
+ } else {
+ grpc_handshake_manager_add(
+ handshake_mgr,
+ security_handshaker_create(exec_ctx, handshaker, connector));
+ }
+}
diff --git a/src/core/lib/security/transport/handshake.h b/src/core/lib/security/transport/security_handshaker.h
index f894540515..f71f43a359 100644
--- a/src/core/lib/security/transport/handshake.h
+++ b/src/core/lib/security/transport/security_handshaker.h
@@ -31,20 +31,17 @@
*
*/
-#ifndef GRPC_CORE_LIB_SECURITY_TRANSPORT_HANDSHAKE_H
-#define GRPC_CORE_LIB_SECURITY_TRANSPORT_HANDSHAKE_H
+#ifndef GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H
+#define GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/security/transport/security_connector.h"
-/* Calls the callback upon completion. Takes owership of handshaker and
- * read_buffer. */
-void grpc_do_security_handshake(
- grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker,
- grpc_security_connector *connector, bool is_client_side,
- grpc_endpoint *nonsecure_endpoint, grpc_slice_buffer *read_buffer,
- gpr_timespec deadline, grpc_security_handshake_done_cb cb, void *user_data);
+/// Creates any necessary security handshakers and adds them to
+/// \a handshake_mgr.
+void grpc_security_create_handshakers(grpc_exec_ctx *exec_ctx,
+ tsi_handshaker *handshaker,
+ grpc_security_connector *connector,
+ grpc_handshake_manager *handshake_mgr);
-void grpc_security_handshake_shutdown(grpc_exec_ctx *exec_ctx, void *handshake);
-
-#endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_HANDSHAKE_H */
+#endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H */
diff --git a/src/core/lib/support/backoff.c b/src/core/lib/support/backoff.c
index e89ef47220..0612472712 100644
--- a/src/core/lib/support/backoff.c
+++ b/src/core/lib/support/backoff.c
@@ -35,8 +35,10 @@
#include <grpc/support/useful.h>
-void gpr_backoff_init(gpr_backoff *backoff, double multiplier, double jitter,
+void gpr_backoff_init(gpr_backoff *backoff, int64_t initial_connect_timeout,
+ double multiplier, double jitter,
int64_t min_timeout_millis, int64_t max_timeout_millis) {
+ backoff->initial_connect_timeout = initial_connect_timeout;
backoff->multiplier = multiplier;
backoff->jitter = jitter;
backoff->min_timeout_millis = min_timeout_millis;
@@ -45,9 +47,10 @@ void gpr_backoff_init(gpr_backoff *backoff, double multiplier, double jitter,
}
gpr_timespec gpr_backoff_begin(gpr_backoff *backoff, gpr_timespec now) {
- backoff->current_timeout_millis = backoff->min_timeout_millis;
- return gpr_time_add(
- now, gpr_time_from_millis(backoff->current_timeout_millis, GPR_TIMESPAN));
+ backoff->current_timeout_millis = backoff->initial_connect_timeout;
+ const int64_t first_timeout =
+ GPR_MAX(backoff->current_timeout_millis, backoff->min_timeout_millis);
+ return gpr_time_add(now, gpr_time_from_millis(first_timeout, GPR_TIMESPAN));
}
/* Generate a random number between 0 and 1. */
@@ -57,20 +60,28 @@ static double generate_uniform_random_number(uint32_t *rng_state) {
}
gpr_timespec gpr_backoff_step(gpr_backoff *backoff, gpr_timespec now) {
- double new_timeout_millis =
+ const double new_timeout_millis =
backoff->multiplier * (double)backoff->current_timeout_millis;
- double jitter_range = backoff->jitter * new_timeout_millis;
- double jitter =
+ backoff->current_timeout_millis =
+ GPR_MIN((int64_t)new_timeout_millis, backoff->max_timeout_millis);
+
+ const double jitter_range_width = backoff->jitter * new_timeout_millis;
+ const double jitter =
(2 * generate_uniform_random_number(&backoff->rng_state) - 1) *
- jitter_range;
+ jitter_range_width;
+
backoff->current_timeout_millis =
- GPR_CLAMP((int64_t)(new_timeout_millis + jitter),
- backoff->min_timeout_millis, backoff->max_timeout_millis);
- return gpr_time_add(
+ (int64_t)((double)(backoff->current_timeout_millis) + jitter);
+
+ const gpr_timespec current_deadline = gpr_time_add(
now, gpr_time_from_millis(backoff->current_timeout_millis, GPR_TIMESPAN));
+
+ const gpr_timespec min_deadline = gpr_time_add(
+ now, gpr_time_from_millis(backoff->min_timeout_millis, GPR_TIMESPAN));
+
+ return gpr_time_max(current_deadline, min_deadline);
}
void gpr_backoff_reset(gpr_backoff *backoff) {
- // forces step() to return a timeout of min_timeout_millis
- backoff->current_timeout_millis = 0;
+ backoff->current_timeout_millis = backoff->initial_connect_timeout;
}
diff --git a/src/core/lib/support/backoff.h b/src/core/lib/support/backoff.h
index 6d40c15546..5e9b740824 100644
--- a/src/core/lib/support/backoff.h
+++ b/src/core/lib/support/backoff.h
@@ -37,7 +37,9 @@
#include <grpc/support/time.h>
typedef struct {
- /// const: multiplier between retry attempts
+ /// const: how long to wait after the first failure before retrying
+ int64_t initial_connect_timeout;
+ /// const: factor with which to multiply backoff after a failed retry
double multiplier;
/// const: amount to randomize backoffs
double jitter;
@@ -54,7 +56,8 @@ typedef struct {
} gpr_backoff;
/// Initialize backoff machinery - does not need to be destroyed
-void gpr_backoff_init(gpr_backoff *backoff, double multiplier, double jitter,
+void gpr_backoff_init(gpr_backoff *backoff, int64_t initial_connect_timeout,
+ double multiplier, double jitter,
int64_t min_timeout_millis, int64_t max_timeout_millis);
/// Begin retry loop: returns a timespec for the NEXT retry