aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/channel/context.h3
-rw-r--r--src/core/lib/channel/handshaker.c198
-rw-r--r--src/core/lib/channel/handshaker.h80
-rw-r--r--src/core/lib/http/httpcli_security_connector.c91
-rw-r--r--src/core/lib/iomgr/combiner.c11
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c585
-rw-r--r--src/core/lib/iomgr/resource_quota.c12
-rw-r--r--src/core/lib/iomgr/socket_windows.c8
-rw-r--r--src/core/lib/iomgr/socket_windows.h1
-rw-r--r--src/core/lib/iomgr/tcp_client_windows.c26
-rw-r--r--src/core/lib/iomgr/tcp_posix.c26
-rw-r--r--src/core/lib/iomgr/tcp_server.h3
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c16
-rw-r--r--src/core/lib/iomgr/tcp_server_uv.c9
-rw-r--r--src/core/lib/iomgr/tcp_server_windows.c94
-rw-r--r--src/core/lib/iomgr/udp_server.c6
-rw-r--r--src/core/lib/security/transport/handshake.c374
-rw-r--r--src/core/lib/security/transport/security_connector.c253
-rw-r--r--src/core/lib/security/transport/security_connector.h58
-rw-r--r--src/core/lib/security/transport/security_handshaker.c450
-rw-r--r--src/core/lib/security/transport/security_handshaker.h (renamed from src/core/lib/security/transport/handshake.h)20
-rw-r--r--src/core/lib/support/backoff.c37
-rw-r--r--src/core/lib/support/backoff.h7
-rw-r--r--src/core/lib/support/subprocess_posix.c3
-rw-r--r--src/core/lib/surface/call.c4
-rw-r--r--src/core/lib/surface/completion_queue.c2
-rw-r--r--src/core/lib/transport/connectivity_state.c13
-rw-r--r--src/core/lib/transport/connectivity_state.h5
28 files changed, 1249 insertions, 1146 deletions
diff --git a/src/core/lib/channel/context.h b/src/core/lib/channel/context.h
index 071c5f695c..6c931ad28a 100644
--- a/src/core/lib/channel/context.h
+++ b/src/core/lib/channel/context.h
@@ -47,6 +47,9 @@ typedef enum {
/// Value is a \a census_context.
GRPC_CONTEXT_TRACING,
+ /// Reserved for traffic_class_context.
+ GRPC_CONTEXT_TRAFFIC,
+
GRPC_CONTEXT_COUNT
} grpc_context_index;
diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c
index a45a39981c..23edc826ca 100644
--- a/src/core/lib/channel/handshaker.c
+++ b/src/core/lib/channel/handshaker.c
@@ -38,12 +38,13 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
+#include "src/core/lib/iomgr/timer.h"
//
// grpc_handshaker
//
-void grpc_handshaker_init(const struct grpc_handshaker_vtable* vtable,
+void grpc_handshaker_init(const grpc_handshaker_vtable* vtable,
grpc_handshaker* handshaker) {
handshaker->vtable = vtable;
}
@@ -60,45 +61,43 @@ void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker,
- grpc_endpoint* endpoint,
- grpc_channel_args* args,
- grpc_slice_buffer* read_buffer,
- gpr_timespec deadline,
grpc_tcp_server_acceptor* acceptor,
- grpc_handshaker_done_cb cb, void* user_data) {
- handshaker->vtable->do_handshake(exec_ctx, handshaker, endpoint, args,
- read_buffer, deadline, acceptor, cb,
- user_data);
+ grpc_closure* on_handshake_done,
+ grpc_handshaker_args* args) {
+ handshaker->vtable->do_handshake(exec_ctx, handshaker, acceptor,
+ on_handshake_done, args);
}
//
// grpc_handshake_manager
//
-// State used while chaining handshakers.
-struct grpc_handshaker_state {
- // The index of the handshaker to invoke next.
- size_t index;
- // The deadline for all handshakers.
- gpr_timespec deadline;
- // The acceptor to call the handshakers with.
- grpc_tcp_server_acceptor* acceptor;
- // The final callback and user_data to invoke after the last handshaker.
- grpc_handshaker_done_cb final_cb;
- void* final_user_data;
-};
-
struct grpc_handshake_manager {
+ gpr_mu mu;
+ gpr_refcount refs;
+ bool shutdown;
// An array of handshakers added via grpc_handshake_manager_add().
size_t count;
grpc_handshaker** handshakers;
- // State used while chaining handshakers.
- struct grpc_handshaker_state* state;
+ // The index of the handshaker to invoke next and closure to invoke it.
+ size_t index;
+ grpc_closure call_next_handshaker;
+ // The acceptor to call the handshakers with.
+ grpc_tcp_server_acceptor* acceptor;
+ // Deadline timer across all handshakers.
+ grpc_timer deadline_timer;
+ // The final callback and user_data to invoke after the last handshaker.
+ grpc_closure on_handshake_done;
+ void* user_data;
+ // Handshaker args.
+ grpc_handshaker_args args;
};
grpc_handshake_manager* grpc_handshake_manager_create() {
grpc_handshake_manager* mgr = gpr_malloc(sizeof(grpc_handshake_manager));
memset(mgr, 0, sizeof(*mgr));
+ gpr_mu_init(&mgr->mu);
+ gpr_ref_init(&mgr->refs, 1);
return mgr;
}
@@ -106,6 +105,7 @@ static bool is_power_of_2(size_t n) { return (n & (n - 1)) == 0; }
void grpc_handshake_manager_add(grpc_handshake_manager* mgr,
grpc_handshaker* handshaker) {
+ gpr_mu_lock(&mgr->mu);
// To avoid allocating memory for each handshaker we add, we double
// the number of elements every time we need more.
size_t realloc_count = 0;
@@ -119,85 +119,117 @@ void grpc_handshake_manager_add(grpc_handshake_manager* mgr,
gpr_realloc(mgr->handshakers, realloc_count * sizeof(grpc_handshaker*));
}
mgr->handshakers[mgr->count++] = handshaker;
+ gpr_mu_unlock(&mgr->mu);
+}
+
+static void grpc_handshake_manager_unref(grpc_exec_ctx* exec_ctx,
+ grpc_handshake_manager* mgr) {
+ if (gpr_unref(&mgr->refs)) {
+ for (size_t i = 0; i < mgr->count; ++i) {
+ grpc_handshaker_destroy(exec_ctx, mgr->handshakers[i]);
+ }
+ gpr_free(mgr->handshakers);
+ gpr_mu_destroy(&mgr->mu);
+ gpr_free(mgr);
+ }
}
void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx,
grpc_handshake_manager* mgr) {
- for (size_t i = 0; i < mgr->count; ++i) {
- grpc_handshaker_destroy(exec_ctx, mgr->handshakers[i]);
- }
- gpr_free(mgr->handshakers);
- gpr_free(mgr);
+ grpc_handshake_manager_unref(exec_ctx, mgr);
}
void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshake_manager* mgr) {
- for (size_t i = 0; i < mgr->count; ++i) {
- grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[i]);
+ gpr_mu_lock(&mgr->mu);
+ // Shutdown the handshaker that's currently in progress, if any.
+ if (!mgr->shutdown && mgr->index > 0) {
+ mgr->shutdown = true;
+ grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[mgr->index - 1]);
}
- if (mgr->state != NULL) {
- gpr_free(mgr->state);
- mgr->state = NULL;
+ gpr_mu_unlock(&mgr->mu);
+}
+
+// Helper function to call either the next handshaker or the
+// on_handshake_done callback.
+// Returns true if we've scheduled the on_handshake_done callback.
+static bool call_next_handshaker_locked(grpc_exec_ctx* exec_ctx,
+ grpc_handshake_manager* mgr,
+ grpc_error* error) {
+ GPR_ASSERT(mgr->index <= mgr->count);
+ // If we got an error or we've been shut down or we're exiting early or
+ // we've finished the last handshaker, invoke the on_handshake_done
+ // callback. Otherwise, call the next handshaker.
+ if (error != GRPC_ERROR_NONE || mgr->shutdown || mgr->args.exit_early ||
+ mgr->index == mgr->count) {
+ // Cancel deadline timer, since we're invoking the on_handshake_done
+ // callback now.
+ grpc_timer_cancel(exec_ctx, &mgr->deadline_timer);
+ grpc_exec_ctx_sched(exec_ctx, &mgr->on_handshake_done, error, NULL);
+ mgr->shutdown = true;
+ } else {
+ grpc_handshaker_do_handshake(exec_ctx, mgr->handshakers[mgr->index],
+ mgr->acceptor, &mgr->call_next_handshaker,
+ &mgr->args);
}
+ ++mgr->index;
+ return mgr->shutdown;
}
// A function used as the handshaker-done callback when chaining
// handshakers together.
-static void call_next_handshaker(grpc_exec_ctx* exec_ctx,
- grpc_endpoint* endpoint,
- grpc_channel_args* args,
- grpc_slice_buffer* read_buffer,
- void* user_data, grpc_error* error) {
- grpc_handshake_manager* mgr = user_data;
- GPR_ASSERT(mgr->state != NULL);
- GPR_ASSERT(mgr->state->index < mgr->count);
- // If we got an error, skip all remaining handshakers and invoke the
- // caller-supplied callback immediately.
- if (error != GRPC_ERROR_NONE) {
- mgr->state->final_cb(exec_ctx, endpoint, args, read_buffer,
- mgr->state->final_user_data, error);
- return;
+static void call_next_handshaker(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ grpc_handshake_manager* mgr = arg;
+ gpr_mu_lock(&mgr->mu);
+ bool done = call_next_handshaker_locked(exec_ctx, mgr, GRPC_ERROR_REF(error));
+ gpr_mu_unlock(&mgr->mu);
+ // If we're invoked the final callback, we won't be coming back
+ // to this function, so we can release our reference to the
+ // handshake manager.
+ if (done) {
+ grpc_handshake_manager_unref(exec_ctx, mgr);
}
- grpc_handshaker_done_cb cb = call_next_handshaker;
- // If this is the last handshaker, use the caller-supplied callback
- // and user_data instead of chaining back to this function again.
- if (mgr->state->index == mgr->count - 1) {
- cb = mgr->state->final_cb;
- user_data = mgr->state->final_user_data;
- }
- // Invoke handshaker.
- grpc_handshaker_do_handshake(
- exec_ctx, mgr->handshakers[mgr->state->index], endpoint, args,
- read_buffer, mgr->state->deadline, mgr->state->acceptor, cb, user_data);
- ++mgr->state->index;
- // If this is the last handshaker, clean up state.
- if (mgr->state->index == mgr->count) {
- gpr_free(mgr->state);
- mgr->state = NULL;
+}
+
+// Callback invoked when deadline is exceeded.
+static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+ grpc_handshake_manager* mgr = arg;
+ if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled.
+ grpc_handshake_manager_shutdown(exec_ctx, mgr);
}
+ grpc_handshake_manager_unref(exec_ctx, mgr);
}
void grpc_handshake_manager_do_handshake(
grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr,
- grpc_endpoint* endpoint, const grpc_channel_args* args,
+ grpc_endpoint* endpoint, const grpc_channel_args* channel_args,
gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor,
- grpc_handshaker_done_cb cb, void* user_data) {
- grpc_channel_args* args_copy = grpc_channel_args_copy(args);
- grpc_slice_buffer* read_buffer = gpr_malloc(sizeof(*read_buffer));
- grpc_slice_buffer_init(read_buffer);
- if (mgr->count == 0) {
- // No handshakers registered, so we just immediately call the done
- // callback with the passed-in endpoint.
- cb(exec_ctx, endpoint, args_copy, read_buffer, user_data, GRPC_ERROR_NONE);
- } else {
- GPR_ASSERT(mgr->state == NULL);
- mgr->state = gpr_malloc(sizeof(struct grpc_handshaker_state));
- memset(mgr->state, 0, sizeof(*mgr->state));
- mgr->state->deadline = deadline;
- mgr->state->acceptor = acceptor;
- mgr->state->final_cb = cb;
- mgr->state->final_user_data = user_data;
- call_next_handshaker(exec_ctx, endpoint, args_copy, read_buffer, mgr,
- GRPC_ERROR_NONE);
+ grpc_iomgr_cb_func on_handshake_done, void* user_data) {
+ gpr_mu_lock(&mgr->mu);
+ GPR_ASSERT(mgr->index == 0);
+ GPR_ASSERT(!mgr->shutdown);
+ // Construct handshaker args. These will be passed through all
+ // handshakers and eventually be freed by the on_handshake_done callback.
+ mgr->args.endpoint = endpoint;
+ mgr->args.args = grpc_channel_args_copy(channel_args);
+ mgr->args.user_data = user_data;
+ mgr->args.read_buffer = gpr_malloc(sizeof(*mgr->args.read_buffer));
+ grpc_slice_buffer_init(mgr->args.read_buffer);
+ // Initialize state needed for calling handshakers.
+ mgr->acceptor = acceptor;
+ grpc_closure_init(&mgr->call_next_handshaker, call_next_handshaker, mgr);
+ grpc_closure_init(&mgr->on_handshake_done, on_handshake_done, &mgr->args);
+ // Start deadline timer, which owns a ref.
+ gpr_ref(&mgr->refs);
+ grpc_timer_init(exec_ctx, &mgr->deadline_timer,
+ gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
+ on_timeout, mgr, gpr_now(GPR_CLOCK_MONOTONIC));
+ // Start first handshaker, which also owns a ref.
+ gpr_ref(&mgr->refs);
+ bool done = call_next_handshaker_locked(exec_ctx, mgr, GRPC_ERROR_NONE);
+ gpr_mu_unlock(&mgr->mu);
+ if (done) {
+ grpc_handshake_manager_unref(exec_ctx, mgr);
}
}
diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h
index f8a36c6473..450b7adaee 100644
--- a/src/core/lib/channel/handshaker.h
+++ b/src/core/lib/channel/handshaker.h
@@ -54,15 +54,33 @@
typedef struct grpc_handshaker grpc_handshaker;
-/// Callback type invoked when a handshaker is done.
-/// Takes ownership of \a args and \a read_buffer.
-typedef void (*grpc_handshaker_done_cb)(grpc_exec_ctx* exec_ctx,
- grpc_endpoint* endpoint,
- grpc_channel_args* args,
- grpc_slice_buffer* read_buffer,
- void* user_data, grpc_error* error);
-
-struct grpc_handshaker_vtable {
+/// Arguments passed through handshakers and to the on_handshake_done callback.
+///
+/// For handshakers, all members are input/output parameters; for
+/// example, a handshaker may read from or write to \a endpoint and
+/// then later replace it with a wrapped endpoint. Similarly, a
+/// handshaker may modify \a args.
+///
+/// A handshaker takes ownership of the members while a handshake is in
+/// progress. Upon failure or shutdown of an in-progress handshaker,
+/// the handshaker is responsible for destroying the members and setting
+/// them to NULL before invoking the on_handshake_done callback.
+///
+/// For the on_handshake_done callback, all members are input arguments,
+/// which the callback takes ownership of.
+typedef struct {
+ grpc_endpoint* endpoint;
+ grpc_channel_args* args;
+ grpc_slice_buffer* read_buffer;
+ // A handshaker may set this to true before invoking on_handshake_done
+ // to indicate that subsequent handshakers should be skipped.
+ bool exit_early;
+ // User data passed through the handshake manager. Not used by
+ // individual handshakers.
+ void* user_data;
+} grpc_handshaker_args;
+
+typedef struct {
/// Destroys the handshaker.
void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker);
@@ -70,43 +88,35 @@ struct grpc_handshaker_vtable {
/// aborted in the middle).
void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker);
- /// Performs handshaking. When finished, calls \a cb with \a user_data.
- /// Takes ownership of \a args.
- /// Takes ownership of \a read_buffer, which contains leftover bytes read
- /// from the endpoint by the previous handshaker.
+ /// Performs handshaking, modifying \a args as needed (e.g., to
+ /// replace \a endpoint with a wrapped endpoint).
+ /// When finished, invokes \a on_handshake_done.
/// \a acceptor will be NULL for client-side handshakers.
void (*do_handshake)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker,
- grpc_endpoint* endpoint, grpc_channel_args* args,
- grpc_slice_buffer* read_buffer, gpr_timespec deadline,
grpc_tcp_server_acceptor* acceptor,
- grpc_handshaker_done_cb cb, void* user_data);
-};
+ grpc_closure* on_handshake_done,
+ grpc_handshaker_args* args);
+} grpc_handshaker_vtable;
/// Base struct. To subclass, make this the first member of the
/// implementation struct.
struct grpc_handshaker {
- const struct grpc_handshaker_vtable* vtable;
+ const grpc_handshaker_vtable* vtable;
};
/// Called by concrete implementations to initialize the base struct.
-void grpc_handshaker_init(const struct grpc_handshaker_vtable* vtable,
+void grpc_handshaker_init(const grpc_handshaker_vtable* vtable,
grpc_handshaker* handshaker);
-/// Convenient wrappers for invoking methods via the vtable.
-/// These probably do not need to be called from anywhere but
-/// grpc_handshake_manager.
void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker);
void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker);
void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker,
- grpc_endpoint* endpoint,
- grpc_channel_args* args,
- grpc_slice_buffer* read_buffer,
- gpr_timespec deadline,
grpc_tcp_server_acceptor* acceptor,
- grpc_handshaker_done_cb cb, void* user_data);
+ grpc_closure* on_handshake_done,
+ grpc_handshaker_args* args);
///
/// grpc_handshake_manager
@@ -134,15 +144,21 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshake_manager* mgr);
/// Invokes handshakers in the order they were added.
-/// Does NOT take ownership of \a args. Instead, makes a copy before
+/// Takes ownership of \a endpoint, and then passes that ownership to
+/// the \a on_handshake_done callback.
+/// Does NOT take ownership of \a channel_args. Instead, makes a copy before
/// invoking the first handshaker.
/// \a acceptor will be NULL for client-side handshakers.
-/// Invokes \a cb with \a user_data after either a handshaker fails or
-/// all handshakers have completed successfully.
+///
+/// When done, invokes \a on_handshake_done with a grpc_handshaker_args
+/// object as its argument. If the callback is invoked with error !=
+/// GRPC_ERROR_NONE, then handshaking failed and the handshaker has done
+/// the necessary clean-up. Otherwise, the callback takes ownership of
+/// the arguments.
void grpc_handshake_manager_do_handshake(
grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr,
- grpc_endpoint* endpoint, const grpc_channel_args* args,
+ grpc_endpoint* endpoint, const grpc_channel_args* channel_args,
gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor,
- grpc_handshaker_done_cb cb, void* user_data);
+ grpc_iomgr_cb_func on_handshake_done, void* user_data);
#endif /* GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H */
diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c
index 24d264c32a..14cdb1dab3 100644
--- a/src/core/lib/http/httpcli_security_connector.c
+++ b/src/core/lib/http/httpcli_security_connector.c
@@ -38,7 +38,9 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-#include "src/core/lib/security/transport/handshake.h"
+
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/security/transport/security_handshaker.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/tsi/ssl_transport_security.h"
@@ -58,52 +60,43 @@ static void httpcli_ssl_destroy(grpc_security_connector *sc) {
gpr_free(sc);
}
-static void httpcli_ssl_do_handshake(grpc_exec_ctx *exec_ctx,
- grpc_channel_security_connector *sc,
- grpc_endpoint *nonsecure_endpoint,
- grpc_slice_buffer *read_buffer,
- gpr_timespec deadline,
- grpc_security_handshake_done_cb cb,
- void *user_data) {
+static void httpcli_ssl_add_handshakers(grpc_exec_ctx *exec_ctx,
+ grpc_channel_security_connector *sc,
+ grpc_handshake_manager *handshake_mgr) {
grpc_httpcli_ssl_channel_security_connector *c =
(grpc_httpcli_ssl_channel_security_connector *)sc;
- tsi_result result = TSI_OK;
- tsi_handshaker *handshaker;
- if (c->handshaker_factory == NULL) {
- gpr_free(read_buffer);
- cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL);
- return;
- }
- result = tsi_ssl_handshaker_factory_create_handshaker(
- c->handshaker_factory, c->secure_peer_name, &handshaker);
- if (result != TSI_OK) {
- gpr_log(GPR_ERROR, "Handshaker creation failed with error %s.",
- tsi_result_to_string(result));
- gpr_free(read_buffer);
- cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL);
- } else {
- grpc_do_security_handshake(exec_ctx, handshaker, &sc->base, true,
- nonsecure_endpoint, read_buffer, deadline, cb,
- user_data);
+ tsi_handshaker *handshaker = NULL;
+ if (c->handshaker_factory != NULL) {
+ tsi_result result = tsi_ssl_handshaker_factory_create_handshaker(
+ c->handshaker_factory, c->secure_peer_name, &handshaker);
+ if (result != TSI_OK) {
+ gpr_log(GPR_ERROR, "Handshaker creation failed with error %s.",
+ tsi_result_to_string(result));
+ }
}
+ grpc_handshake_manager_add(
+ handshake_mgr,
+ grpc_security_handshaker_create(exec_ctx, handshaker, &sc->base));
}
static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc, tsi_peer peer,
- grpc_security_peer_check_cb cb,
- void *user_data) {
+ grpc_auth_context **auth_context,
+ grpc_closure *on_peer_checked) {
grpc_httpcli_ssl_channel_security_connector *c =
(grpc_httpcli_ssl_channel_security_connector *)sc;
- grpc_security_status status = GRPC_SECURITY_OK;
+ grpc_error *error = GRPC_ERROR_NONE;
/* Check the peer name. */
if (c->secure_peer_name != NULL &&
!tsi_ssl_peer_matches_name(&peer, c->secure_peer_name)) {
- gpr_log(GPR_ERROR, "Peer name %s is not in peer certificate",
- c->secure_peer_name);
- status = GRPC_SECURITY_ERROR;
+ char *msg;
+ gpr_asprintf(&msg, "Peer name %s is not in peer certificate",
+ c->secure_peer_name);
+ error = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
}
- cb(exec_ctx, user_data, status, NULL);
+ grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL);
tsi_peer_destruct(&peer);
}
@@ -140,7 +133,7 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create(
*sc = NULL;
return GRPC_SECURITY_ERROR;
}
- c->base.do_handshake = httpcli_ssl_do_handshake;
+ c->base.add_handshakers = httpcli_ssl_add_handshakers;
*sc = &c->base;
return GRPC_SECURITY_OK;
}
@@ -150,19 +143,25 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create(
typedef struct {
void (*func)(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *endpoint);
void *arg;
+ grpc_handshake_manager *handshake_mgr;
} on_done_closure;
-static void on_secure_transport_setup_done(grpc_exec_ctx *exec_ctx, void *rp,
- grpc_security_status status,
- grpc_endpoint *secure_endpoint,
- grpc_auth_context *auth_context) {
- on_done_closure *c = rp;
- if (status != GRPC_SECURITY_OK) {
- gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status);
+static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_handshaker_args *args = arg;
+ on_done_closure *c = args->user_data;
+ if (error != GRPC_ERROR_NONE) {
+ const char *msg = grpc_error_string(error);
+ gpr_log(GPR_ERROR, "Secure transport setup failed: %s", msg);
+ grpc_error_free_string(msg);
c->func(exec_ctx, c->arg, NULL);
} else {
- c->func(exec_ctx, c->arg, secure_endpoint);
+ grpc_channel_args_destroy(args->args);
+ grpc_slice_buffer_destroy(args->read_buffer);
+ gpr_free(args->read_buffer);
+ c->func(exec_ctx, c->arg, args->endpoint);
}
+ grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr);
gpr_free(c);
}
@@ -183,11 +182,15 @@ static void ssl_handshake(grpc_exec_ctx *exec_ctx, void *arg,
}
c->func = on_done;
c->arg = arg;
+ c->handshake_mgr = grpc_handshake_manager_create();
GPR_ASSERT(httpcli_ssl_channel_security_connector_create(
pem_root_certs, pem_root_certs_size, host, &sc) ==
GRPC_SECURITY_OK);
- grpc_channel_security_connector_do_handshake(
- exec_ctx, sc, tcp, NULL, deadline, on_secure_transport_setup_done, c);
+ grpc_channel_security_connector_add_handshakers(exec_ctx, sc,
+ c->handshake_mgr);
+ grpc_handshake_manager_do_handshake(
+ exec_ctx, c->handshake_mgr, tcp, NULL /* channel_args */, deadline,
+ NULL /* acceptor */, on_handshake_done, c /* user_data */);
GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "httpcli");
}
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index 60ee14eb23..cfc67020ae 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -90,6 +90,12 @@ static bool is_covered_by_poller(grpc_combiner *lock) {
gpr_atm_acq_load(&lock->elements_covered_by_poller) > 0;
}
+#define IS_COVERED_BY_POLLER_FMT "(final=%d elems=%" PRIdPTR ")->%d"
+#define IS_COVERED_BY_POLLER_ARGS(lock) \
+ (lock)->final_list_covered_by_poller, \
+ gpr_atm_acq_load(&(lock)->elements_covered_by_poller), \
+ is_covered_by_poller((lock))
+
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
grpc_combiner *lock = gpr_malloc(sizeof(*lock));
lock->next_combiner_on_this_exec_ctx = NULL;
@@ -197,9 +203,10 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
GRPC_COMBINER_TRACE(
gpr_log(GPR_DEBUG,
"C:%p grpc_combiner_continue_exec_ctx workqueue=%p "
- "is_covered_by_poller=%d exec_ctx_ready_to_finish=%d "
+ "is_covered_by_poller=" IS_COVERED_BY_POLLER_FMT
+ " exec_ctx_ready_to_finish=%d "
"time_to_execute_final_list=%d",
- lock, lock->optional_workqueue, is_covered_by_poller(lock),
+ lock, lock->optional_workqueue, IS_COVERED_BY_POLLER_ARGS(lock),
grpc_exec_ctx_ready_to_finish(exec_ctx),
lock->time_to_execute_final_list));
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 07fbfd849e..1b15e0eb4f 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -69,6 +69,9 @@ static int grpc_polling_trace = 0; /* Disabled by default */
gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
}
+/* Uncomment the following enable extra checks on poll_object operations */
+/* #define PO_DEBUG */
+
static int grpc_wakeup_signal = -1;
static bool is_grpc_wakeup_signal_initialized = false;
@@ -95,10 +98,42 @@ void grpc_use_signal(int signum) {
struct polling_island;
+typedef enum {
+ POLL_OBJ_FD,
+ POLL_OBJ_POLLSET,
+ POLL_OBJ_POLLSET_SET
+} poll_obj_type;
+
+typedef struct poll_obj {
+#ifdef PO_DEBUG
+ poll_obj_type obj_type;
+#endif
+ gpr_mu mu;
+ struct polling_island *pi;
+} poll_obj;
+
+const char *poll_obj_string(poll_obj_type po_type) {
+ switch (po_type) {
+ case POLL_OBJ_FD:
+ return "fd";
+ case POLL_OBJ_POLLSET:
+ return "pollset";
+ case POLL_OBJ_POLLSET_SET:
+ return "pollset_set";
+ }
+
+ GPR_UNREACHABLE_CODE(return "UNKNOWN");
+}
+
/*******************************************************************************
* Fd Declarations
*/
+
+#define FD_FROM_PO(po) ((grpc_fd *)(po))
+
struct grpc_fd {
+ poll_obj po;
+
int fd;
/* refst format:
bit 0 : 1=Active / 0=Orphaned
@@ -106,8 +141,6 @@ struct grpc_fd {
Ref/Unref by two to avoid altering the orphaned bit */
gpr_atm refst;
- gpr_mu mu;
-
/* Indicates that the fd is shutdown and that any pending read/write closures
should fail */
bool shutdown;
@@ -120,9 +153,6 @@ struct grpc_fd {
grpc_closure *read_closure;
grpc_closure *write_closure;
- /* The polling island to which this fd belongs to (protected by mu) */
- struct polling_island *polling_island;
-
struct grpc_fd *freelist_next;
grpc_closure *on_done_closure;
@@ -225,41 +255,21 @@ struct grpc_pollset_worker {
};
struct grpc_pollset {
- gpr_mu mu;
+ poll_obj po;
+
grpc_pollset_worker root_worker;
bool kicked_without_pollers;
bool shutting_down; /* Is the pollset shutting down ? */
bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
grpc_closure *shutdown_done; /* Called after after shutdown is complete */
-
- /* The polling island to which this pollset belongs to */
- struct polling_island *polling_island;
};
/*******************************************************************************
* Pollset-set Declarations
*/
-/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
- * directly points to a polling_island (and adding an fd/pollset/pollset_set to
- * the current pollset_set would result in polling island merges. This would
- * remove the need to maintain fd_count here. This will also significantly
- * simplify the grpc_fd structure since we would no longer need to explicitly
- * maintain the orphaned state */
struct grpc_pollset_set {
- gpr_mu mu;
-
- size_t pollset_count;
- size_t pollset_capacity;
- grpc_pollset **pollsets;
-
- size_t pollset_set_count;
- size_t pollset_set_capacity;
- struct grpc_pollset_set **pollset_sets;
-
- size_t fd_count;
- size_t fd_capacity;
- grpc_fd **fds;
+ poll_obj po;
};
/*******************************************************************************
@@ -915,7 +925,7 @@ static void fd_global_shutdown(void) {
while (fd_freelist != NULL) {
grpc_fd *fd = fd_freelist;
fd_freelist = fd_freelist->freelist_next;
- gpr_mu_destroy(&fd->mu);
+ gpr_mu_destroy(&fd->po.mu);
gpr_free(fd);
}
gpr_mu_destroy(&fd_freelist_mu);
@@ -933,13 +943,17 @@ static grpc_fd *fd_create(int fd, const char *name) {
if (new_fd == NULL) {
new_fd = gpr_malloc(sizeof(grpc_fd));
- gpr_mu_init(&new_fd->mu);
+ gpr_mu_init(&new_fd->po.mu);
}
- /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
- newly created fd (or an fd we got from the freelist), no one else would be
- holding a lock to it anyway. */
- gpr_mu_lock(&new_fd->mu);
+ /* Note: It is not really needed to get the new_fd->po.mu lock here. If this
+ * is a newly created fd (or an fd we got from the freelist), no one else
+ * would be holding a lock to it anyway. */
+ gpr_mu_lock(&new_fd->po.mu);
+ new_fd->po.pi = NULL;
+#ifdef PO_DEBUG
+ new_fd->po.obj_type = POLL_OBJ_FD;
+#endif
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd;
@@ -947,12 +961,11 @@ static grpc_fd *fd_create(int fd, const char *name) {
new_fd->orphaned = false;
new_fd->read_closure = CLOSURE_NOT_READY;
new_fd->write_closure = CLOSURE_NOT_READY;
- new_fd->polling_island = NULL;
new_fd->freelist_next = NULL;
new_fd->on_done_closure = NULL;
new_fd->read_notifier_pollset = NULL;
- gpr_mu_unlock(&new_fd->mu);
+ gpr_mu_unlock(&new_fd->po.mu);
char *fd_name;
gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
@@ -964,17 +977,13 @@ static grpc_fd *fd_create(int fd, const char *name) {
return new_fd;
}
-static bool fd_is_orphaned(grpc_fd *fd) {
- return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
-}
-
static int fd_wrapped_fd(grpc_fd *fd) {
int ret_fd = -1;
- gpr_mu_lock(&fd->mu);
+ gpr_mu_lock(&fd->po.mu);
if (!fd->orphaned) {
ret_fd = fd->fd;
}
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_unlock(&fd->po.mu);
return ret_fd;
}
@@ -986,7 +995,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_error *error = GRPC_ERROR_NONE;
polling_island *unref_pi = NULL;
- gpr_mu_lock(&fd->mu);
+ gpr_mu_lock(&fd->po.mu);
fd->on_done_closure = on_done;
/* If release_fd is not NULL, we should be relinquishing control of the file
@@ -1006,25 +1015,25 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
/* Remove the fd from the polling island:
- Get a lock on the latest polling island (i.e the last island in the
- linked list pointed by fd->polling_island). This is the island that
+ linked list pointed by fd->po.pi). This is the island that
would actually contain the fd
- Remove the fd from the latest polling island
- Unlock the latest polling island
- - Set fd->polling_island to NULL (but remove the ref on the polling island
+ - Set fd->po.pi to NULL (but remove the ref on the polling island
before doing this.) */
- if (fd->polling_island != NULL) {
- polling_island *pi_latest = polling_island_lock(fd->polling_island);
+ if (fd->po.pi != NULL) {
+ polling_island *pi_latest = polling_island_lock(fd->po.pi);
polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
gpr_mu_unlock(&pi_latest->mu);
- unref_pi = fd->polling_island;
- fd->polling_island = NULL;
+ unref_pi = fd->po.pi;
+ fd->po.pi = NULL;
}
grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error),
NULL);
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_unlock(&fd->po.mu);
UNREF_BY(fd, 2, reason); /* Drop the reference */
if (unref_pi != NULL) {
/* Unref stale polling island here, outside the fd lock above.
@@ -1089,23 +1098,23 @@ static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
grpc_fd *fd) {
grpc_pollset *notifier = NULL;
- gpr_mu_lock(&fd->mu);
+ gpr_mu_lock(&fd->po.mu);
notifier = fd->read_notifier_pollset;
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_unlock(&fd->po.mu);
return notifier;
}
static bool fd_is_shutdown(grpc_fd *fd) {
- gpr_mu_lock(&fd->mu);
+ gpr_mu_lock(&fd->po.mu);
const bool r = fd->shutdown;
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_unlock(&fd->po.mu);
return r;
}
/* Might be called multiple times */
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- gpr_mu_lock(&fd->mu);
+ gpr_mu_lock(&fd->po.mu);
/* Do the actual shutdown only once */
if (!fd->shutdown) {
fd->shutdown = true;
@@ -1116,28 +1125,28 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
set_ready_locked(exec_ctx, fd, &fd->read_closure);
set_ready_locked(exec_ctx, fd, &fd->write_closure);
}
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_unlock(&fd->po.mu);
}
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- gpr_mu_lock(&fd->mu);
+ gpr_mu_lock(&fd->po.mu);
notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_unlock(&fd->po.mu);
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- gpr_mu_lock(&fd->mu);
+ gpr_mu_lock(&fd->po.mu);
notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_unlock(&fd->po.mu);
}
static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
- gpr_mu_lock(&fd->mu);
- grpc_workqueue *workqueue = GRPC_WORKQUEUE_REF(
- (grpc_workqueue *)fd->polling_island, "fd_get_workqueue");
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_lock(&fd->po.mu);
+ grpc_workqueue *workqueue =
+ GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue");
+ gpr_mu_unlock(&fd->po.mu);
return workqueue;
}
@@ -1277,8 +1286,12 @@ static grpc_error *kick_poller(void) {
}
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
- gpr_mu_init(&pollset->mu);
- *mu = &pollset->mu;
+ gpr_mu_init(&pollset->po.mu);
+ *mu = &pollset->po.mu;
+ pollset->po.pi = NULL;
+#ifdef PO_DEBUG
+ pollset->po.obj_type = POLL_OBJ_POLLSET;
+#endif
pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
pollset->kicked_without_pollers = false;
@@ -1286,8 +1299,6 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
pollset->shutting_down = false;
pollset->finish_shutdown_called = false;
pollset->shutdown_done = NULL;
-
- pollset->polling_island = NULL;
}
/* Convert a timespec to milliseconds:
@@ -1317,26 +1328,26 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
- /* Need the fd->mu since we might be racing with fd_notify_on_read */
- gpr_mu_lock(&fd->mu);
+ /* Need the fd->po.mu since we might be racing with fd_notify_on_read */
+ gpr_mu_lock(&fd->po.mu);
set_ready_locked(exec_ctx, fd, &fd->read_closure);
fd->read_notifier_pollset = notifier;
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_unlock(&fd->po.mu);
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- /* Need the fd->mu since we might be racing with fd_notify_on_write */
- gpr_mu_lock(&fd->mu);
+ /* Need the fd->po.mu since we might be racing with fd_notify_on_write */
+ gpr_mu_lock(&fd->po.mu);
set_ready_locked(exec_ctx, fd, &fd->write_closure);
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_unlock(&fd->po.mu);
}
static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
grpc_pollset *ps, char *reason) {
- if (ps->polling_island != NULL) {
- PI_UNREF(exec_ctx, ps->polling_island, reason);
+ if (ps->po.pi != NULL) {
+ PI_UNREF(exec_ctx, ps->po.pi, reason);
}
- ps->polling_island = NULL;
+ ps->po.pi = NULL;
}
static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
@@ -1346,12 +1357,12 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
pollset->finish_shutdown_called = true;
- /* Release the ref and set pollset->polling_island to NULL */
+ /* Release the ref and set pollset->po.pi to NULL */
pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
}
-/* pollset->mu lock must be held by the caller before calling this */
+/* pollset->po.mu lock must be held by the caller before calling this */
static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
GPR_TIMER_BEGIN("pollset_shutdown", 0);
@@ -1376,7 +1387,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
* here */
static void pollset_destroy(grpc_pollset *pollset) {
GPR_ASSERT(!pollset_has_workers(pollset));
- gpr_mu_destroy(&pollset->mu);
+ gpr_mu_destroy(&pollset->po.mu);
}
static void pollset_reset(grpc_pollset *pollset) {
@@ -1386,7 +1397,7 @@ static void pollset_reset(grpc_pollset *pollset) {
pollset->finish_shutdown_called = false;
pollset->kicked_without_pollers = false;
pollset->shutdown_done = NULL;
- GPR_ASSERT(pollset->polling_island == NULL);
+ GPR_ASSERT(pollset->po.pi == NULL);
}
static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
@@ -1426,7 +1437,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
/* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
- latest polling island pointed by pollset->polling_island.
+ latest polling island pointed by pollset->po.pi
Since epoll_fd is immutable, we can read it without obtaining the polling
island lock. There is however a possibility that the polling island (from
@@ -1435,36 +1446,36 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
right-away from epoll_wait() and pick up the latest polling_island the next
this function (i.e pollset_work_and_unlock()) is called */
- if (pollset->polling_island == NULL) {
- pollset->polling_island = polling_island_create(exec_ctx, NULL, error);
- if (pollset->polling_island == NULL) {
+ if (pollset->po.pi == NULL) {
+ pollset->po.pi = polling_island_create(exec_ctx, NULL, error);
+ if (pollset->po.pi == NULL) {
GPR_TIMER_END("pollset_work_and_unlock", 0);
return; /* Fatal error. We cannot continue */
}
- PI_ADD_REF(pollset->polling_island, "ps");
+ PI_ADD_REF(pollset->po.pi, "ps");
GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
- (void *)pollset, (void *)pollset->polling_island);
+ (void *)pollset, (void *)pollset->po.pi);
}
- pi = polling_island_maybe_get_latest(pollset->polling_island);
+ pi = polling_island_maybe_get_latest(pollset->po.pi);
epoll_fd = pi->epoll_fd;
- /* Update the pollset->polling_island since the island being pointed by
- pollset->polling_island maybe older than the one pointed by pi) */
- if (pollset->polling_island != pi) {
+ /* Update the pollset->po.pi since the island being pointed by
+ pollset->po.pi maybe older than the one pointed by pi) */
+ if (pollset->po.pi != pi) {
/* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
polling island to be deleted */
PI_ADD_REF(pi, "ps");
- PI_UNREF(exec_ctx, pollset->polling_island, "ps");
- pollset->polling_island = pi;
+ PI_UNREF(exec_ctx, pollset->po.pi, "ps");
+ pollset->po.pi = pi;
}
/* Add an extra ref so that the island does not get destroyed (which means
the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
epoll_fd */
PI_ADD_REF(pi, "ps_work");
- gpr_mu_unlock(&pollset->mu);
+ gpr_mu_unlock(&pollset->po.mu);
/* If we get some workqueue work to do, it might end up completing an item on
the completion queue, so there's no need to poll... so we skip that and
@@ -1537,17 +1548,17 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(pi != NULL);
/* Before leaving, release the extra ref we added to the polling island. It
- is important to use "pi" here (i.e our old copy of pollset->polling_island
+ is important to use "pi" here (i.e our old copy of pollset->po.pi
that we got before releasing the polling island lock). This is because
- pollset->polling_island pointer might get udpated in other parts of the
+ pollset->po.pi pointer might get udpated in other parts of the
code when there is an island merge while we are doing epoll_wait() above */
PI_UNREF(exec_ctx, pi, "ps_work");
GPR_TIMER_END("pollset_work_and_unlock", 0);
}
-/* pollset->mu lock must be held by the caller before calling this.
- The function pollset_work() may temporarily release the lock (pollset->mu)
+/* pollset->po.mu lock must be held by the caller before calling this.
+ The function pollset_work() may temporarily release the lock (pollset->po.mu)
during the course of its execution but it will always re-acquire the lock and
ensure that it is held by the time the function returns */
static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@@ -1617,7 +1628,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
&g_orig_sigmask, &error);
grpc_exec_ctx_flush(exec_ctx);
- gpr_mu_lock(&pollset->mu);
+ gpr_mu_lock(&pollset->po.mu);
/* Note: There is no need to reset worker.is_kicked to 0 since we are no
longer going to use this worker */
@@ -1637,9 +1648,9 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
finish_shutdown_locked(exec_ctx, pollset);
- gpr_mu_unlock(&pollset->mu);
+ gpr_mu_unlock(&pollset->po.mu);
grpc_exec_ctx_flush(exec_ctx);
- gpr_mu_lock(&pollset->mu);
+ gpr_mu_lock(&pollset->po.mu);
}
*worker_hdl = NULL;
@@ -1653,130 +1664,160 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
return error;
}
-static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_fd *fd) {
- GPR_TIMER_BEGIN("pollset_add_fd", 0);
-
- grpc_error *error = GRPC_ERROR_NONE;
+static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag,
+ poll_obj_type bag_type, poll_obj *item,
+ poll_obj_type item_type) {
+ GPR_TIMER_BEGIN("add_poll_object", 0);
- gpr_mu_lock(&pollset->mu);
- gpr_mu_lock(&fd->mu);
+#ifdef PO_DEBUG
+ GPR_ASSERT(item->obj_type == item_type);
+ GPR_ASSERT(bag->obj_type == bag_type);
+#endif
+ grpc_error *error = GRPC_ERROR_NONE;
polling_island *pi_new = NULL;
+ gpr_mu_lock(&bag->mu);
+ gpr_mu_lock(&item->mu);
+
retry:
- /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
- * equal, do nothing.
- * 2) If fd->polling_island and pollset->polling_island are both NULL, create
- * a new polling island (with a refcount of 2) and make the polling_island
- * fields in both fd and pollset to point to the new island
- * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
- * the NULL polling_island field to point to the non-NULL polling_island
- * field (ensure that the refcount on the polling island is incremented by
- * 1 to account for the newly added reference)
- * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
- * and different, merge both the polling islands and update the
- * polling_island fields in both fd and pollset to point to the merged
- * polling island.
+ /*
+ * 1) If item->pi and bag->pi are both non-NULL and equal, do nothing
+ * 2) If item->pi and bag->pi are both NULL, create a new polling island (with
+ * a refcount of 2) and point item->pi and bag->pi to the new island
+ * 3) If exactly one of item->pi or bag->pi is NULL, update it to point to
+ * the other's non-NULL pi
+ * 4) Finally if item->pi and bag-pi are non-NULL and not-equal, merge the
+ * polling islands and update item->pi and bag->pi to point to the new
+ * island
*/
- if (fd->orphaned) {
- gpr_mu_unlock(&fd->mu);
- gpr_mu_unlock(&pollset->mu);
- /* early out */
+ /* Early out if we are trying to add an 'fd' to a 'bag' but the fd is already
+ * orphaned */
+ if (item_type == POLL_OBJ_FD && (FD_FROM_PO(item))->orphaned) {
+ gpr_mu_unlock(&item->mu);
+ gpr_mu_unlock(&bag->mu);
return;
}
- if (fd->polling_island == pollset->polling_island) {
- pi_new = fd->polling_island;
+ if (item->pi == bag->pi) {
+ pi_new = item->pi;
if (pi_new == NULL) {
- /* Unlock before creating a new polling island: the polling island will
- create a workqueue which creates a file descriptor, and holding an fd
- lock here can eventually cause a loop to appear to TSAN (making it
- unhappy). We don't think it's a real loop (there's an epoch point where
- that loop possibility disappears), but the advantages of keeping TSAN
- happy outweigh any performance advantage we might have by keeping the
- lock held. */
- gpr_mu_unlock(&fd->mu);
- pi_new = polling_island_create(exec_ctx, fd, &error);
- gpr_mu_lock(&fd->mu);
- /* Need to reverify any assumptions made between the initial lock and
- getting to this branch: if they've changed, we need to throw away our
- work and figure things out again. */
- if (fd->polling_island != NULL) {
- GRPC_POLLING_TRACE(
- "pollset_add_fd: Raced creating new polling island. pi_new: %p "
- "(fd: %d, pollset: %p)",
- (void *)pi_new, fd->fd, (void *)pollset);
-
- /* No need to lock 'pi_new' here since this is a new polling island and
- * no one has a reference to it yet */
- polling_island_remove_all_fds_locked(pi_new, true, &error);
-
- /* Ref and unref so that the polling island gets deleted during unref */
- PI_ADD_REF(pi_new, "dance_of_destruction");
- PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
- goto retry;
+ /* GPR_ASSERT(item->pi == bag->pi == NULL) */
+
+ /* If we are adding an fd to a bag (i.e pollset or pollset_set), then
+ * we need to do some extra work to make TSAN happy */
+ if (item_type == POLL_OBJ_FD) {
+ /* Unlock before creating a new polling island: the polling island will
+ create a workqueue which creates a file descriptor, and holding an fd
+ lock here can eventually cause a loop to appear to TSAN (making it
+ unhappy). We don't think it's a real loop (there's an epoch point
+ where that loop possibility disappears), but the advantages of
+ keeping TSAN happy outweigh any performance advantage we might have
+ by keeping the lock held. */
+ gpr_mu_unlock(&item->mu);
+ pi_new = polling_island_create(exec_ctx, FD_FROM_PO(item), &error);
+ gpr_mu_lock(&item->mu);
+
+ /* Need to reverify any assumptions made between the initial lock and
+ getting to this branch: if they've changed, we need to throw away our
+ work and figure things out again. */
+ if (item->pi != NULL) {
+ GRPC_POLLING_TRACE(
+ "add_poll_object: Raced creating new polling island. pi_new: %p "
+ "(fd: %d, %s: %p)",
+ (void *)pi_new, FD_FROM_PO(item)->fd, poll_obj_string(bag_type),
+ (void *)bag);
+ /* No need to lock 'pi_new' here since this is a new polling island
+ * and no one has a reference to it yet */
+ polling_island_remove_all_fds_locked(pi_new, true, &error);
+
+ /* Ref and unref so that the polling island gets deleted during unref
+ */
+ PI_ADD_REF(pi_new, "dance_of_destruction");
+ PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
+ goto retry;
+ }
} else {
- GRPC_POLLING_TRACE(
- "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
- "pollset: %p)",
- (void *)pi_new, fd->fd, (void *)pollset);
+ pi_new = polling_island_create(exec_ctx, NULL, &error);
}
+
+ GRPC_POLLING_TRACE(
+ "add_poll_object: Created new polling island. pi_new: %p (%s: %p, "
+ "%s: %p)",
+ (void *)pi_new, poll_obj_string(item_type), (void *)item,
+ poll_obj_string(bag_type), (void *)bag);
+ } else {
+ GRPC_POLLING_TRACE(
+ "add_poll_object: Same polling island. pi: %p (%s, %s)",
+ (void *)pi_new, poll_obj_string(item_type),
+ poll_obj_string(bag_type));
+ }
+ } else if (item->pi == NULL) {
+ /* GPR_ASSERT(bag->pi != NULL) */
+ /* Make pi_new point to latest pi*/
+ pi_new = polling_island_lock(bag->pi);
+
+ if (item_type == POLL_OBJ_FD) {
+ grpc_fd *fd = FD_FROM_PO(item);
+ polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
}
- } else if (fd->polling_island == NULL) {
- pi_new = polling_island_lock(pollset->polling_island);
- polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
- gpr_mu_unlock(&pi_new->mu);
+ gpr_mu_unlock(&pi_new->mu);
GRPC_POLLING_TRACE(
- "pollset_add_fd: fd->pi was NULL. pi_new: %p (fd: %d, pollset: %p, "
- "pollset->pi: %p)",
- (void *)pi_new, fd->fd, (void *)pollset,
- (void *)pollset->polling_island);
- } else if (pollset->polling_island == NULL) {
- pi_new = polling_island_lock(fd->polling_island);
+ "add_poll_obj: item->pi was NULL. pi_new: %p (item(%s): %p, "
+ "bag(%s): %p)",
+ (void *)pi_new, poll_obj_string(item_type), (void *)item,
+ poll_obj_string(bag_type), (void *)bag);
+ } else if (bag->pi == NULL) {
+ /* GPR_ASSERT(item->pi != NULL) */
+ /* Make pi_new to point to latest pi */
+ pi_new = polling_island_lock(item->pi);
gpr_mu_unlock(&pi_new->mu);
-
GRPC_POLLING_TRACE(
- "pollset_add_fd: pollset->pi was NULL. pi_new: %p (fd: %d, pollset: "
- "%p, fd->pi: %p",
- (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island);
+ "add_poll_obj: bag->pi was NULL. pi_new: %p (item(%s): %p, "
+ "bag(%s): %p)",
+ (void *)pi_new, poll_obj_string(item_type), (void *)item,
+ poll_obj_string(bag_type), (void *)bag);
} else {
- pi_new = polling_island_merge(fd->polling_island, pollset->polling_island,
- &error);
+ pi_new = polling_island_merge(item->pi, bag->pi, &error);
GRPC_POLLING_TRACE(
- "pollset_add_fd: polling islands merged. pi_new: %p (fd: %d, pollset: "
- "%p, fd->pi: %p, pollset->pi: %p)",
- (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island,
- (void *)pollset->polling_island);
+ "add_poll_obj: polling islands merged. pi_new: %p (item(%s): %p, "
+ "bag(%s): %p)",
+ (void *)pi_new, poll_obj_string(item_type), (void *)item,
+ poll_obj_string(bag_type), (void *)bag);
}
- /* At this point, pi_new is the polling island that both fd->polling_island
- and pollset->polling_island must be pointing to */
+ /* At this point, pi_new is the polling island that both item->pi and bag->pi
+ MUST be pointing to */
- if (fd->polling_island != pi_new) {
- PI_ADD_REF(pi_new, "fd");
- if (fd->polling_island != NULL) {
- PI_UNREF(exec_ctx, fd->polling_island, "fd");
+ if (item->pi != pi_new) {
+ PI_ADD_REF(pi_new, poll_obj_string(item_type));
+ if (item->pi != NULL) {
+ PI_UNREF(exec_ctx, item->pi, poll_obj_string(item_type));
}
- fd->polling_island = pi_new;
+ item->pi = pi_new;
}
- if (pollset->polling_island != pi_new) {
- PI_ADD_REF(pi_new, "ps");
- if (pollset->polling_island != NULL) {
- PI_UNREF(exec_ctx, pollset->polling_island, "ps");
+ if (bag->pi != pi_new) {
+ PI_ADD_REF(pi_new, poll_obj_string(bag_type));
+ if (bag->pi != NULL) {
+ PI_UNREF(exec_ctx, bag->pi, poll_obj_string(bag_type));
}
- pollset->polling_island = pi_new;
+ bag->pi = pi_new;
}
- gpr_mu_unlock(&fd->mu);
- gpr_mu_unlock(&pollset->mu);
+ gpr_mu_unlock(&item->mu);
+ gpr_mu_unlock(&bag->mu);
- GRPC_LOG_IF_ERROR("pollset_add_fd", error);
+ GRPC_LOG_IF_ERROR("add_poll_object", error);
+ GPR_TIMER_END("add_poll_object", 0);
+}
- GPR_TIMER_END("pollset_add_fd", 0);
+static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_fd *fd) {
+ add_poll_object(exec_ctx, &pollset->po, POLL_OBJ_POLLSET, &fd->po,
+ POLL_OBJ_FD);
}
/*******************************************************************************
@@ -1784,142 +1825,60 @@ retry:
*/
static grpc_pollset_set *pollset_set_create(void) {
- grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
- memset(pollset_set, 0, sizeof(*pollset_set));
- gpr_mu_init(&pollset_set->mu);
- return pollset_set;
+ grpc_pollset_set *pss = gpr_malloc(sizeof(*pss));
+ gpr_mu_init(&pss->po.mu);
+ pss->po.pi = NULL;
+#ifdef PO_DEBUG
+ pss->po.obj_type = POLL_OBJ_POLLSET_SET;
+#endif
+ return pss;
}
-static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
- size_t i;
- gpr_mu_destroy(&pollset_set->mu);
- for (i = 0; i < pollset_set->fd_count; i++) {
- GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
+static void pollset_set_destroy(grpc_pollset_set *pss) {
+ gpr_mu_destroy(&pss->po.mu);
+
+ if (pss->po.pi != NULL) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ PI_UNREF(&exec_ctx, pss->po.pi, "pss_destroy");
+ grpc_exec_ctx_finish(&exec_ctx);
}
- gpr_free(pollset_set->pollsets);
- gpr_free(pollset_set->pollset_sets);
- gpr_free(pollset_set->fds);
- gpr_free(pollset_set);
+
+ gpr_free(pss);
}
-static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pollset_set, grpc_fd *fd) {
- size_t i;
- gpr_mu_lock(&pollset_set->mu);
- if (pollset_set->fd_count == pollset_set->fd_capacity) {
- pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
- pollset_set->fds = gpr_realloc(
- pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
- }
- GRPC_FD_REF(fd, "pollset_set");
- pollset_set->fds[pollset_set->fd_count++] = fd;
- for (i = 0; i < pollset_set->pollset_count; i++) {
- pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
- }
- for (i = 0; i < pollset_set->pollset_set_count; i++) {
- pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
- }
- gpr_mu_unlock(&pollset_set->mu);
+static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
+ grpc_fd *fd) {
+ add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &fd->po,
+ POLL_OBJ_FD);
}
-static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pollset_set, grpc_fd *fd) {
- size_t i;
- gpr_mu_lock(&pollset_set->mu);
- for (i = 0; i < pollset_set->fd_count; i++) {
- if (pollset_set->fds[i] == fd) {
- pollset_set->fd_count--;
- GPR_SWAP(grpc_fd *, pollset_set->fds[i],
- pollset_set->fds[pollset_set->fd_count]);
- GRPC_FD_UNREF(fd, "pollset_set");
- break;
- }
- }
- for (i = 0; i < pollset_set->pollset_set_count; i++) {
- pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
- }
- gpr_mu_unlock(&pollset_set->mu);
+static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
+ grpc_fd *fd) {
+ /* Nothing to do */
}
static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pollset_set,
- grpc_pollset *pollset) {
- size_t i, j;
- gpr_mu_lock(&pollset_set->mu);
- if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
- pollset_set->pollset_capacity =
- GPR_MAX(8, 2 * pollset_set->pollset_capacity);
- pollset_set->pollsets =
- gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
- sizeof(*pollset_set->pollsets));
- }
- pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
- for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
- if (fd_is_orphaned(pollset_set->fds[i])) {
- GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
- } else {
- pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
- pollset_set->fds[j++] = pollset_set->fds[i];
- }
- }
- pollset_set->fd_count = j;
- gpr_mu_unlock(&pollset_set->mu);
+ grpc_pollset_set *pss, grpc_pollset *ps) {
+ add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &ps->po,
+ POLL_OBJ_POLLSET);
}
static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pollset_set,
- grpc_pollset *pollset) {
- size_t i;
- gpr_mu_lock(&pollset_set->mu);
- for (i = 0; i < pollset_set->pollset_count; i++) {
- if (pollset_set->pollsets[i] == pollset) {
- pollset_set->pollset_count--;
- GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
- pollset_set->pollsets[pollset_set->pollset_count]);
- break;
- }
- }
- gpr_mu_unlock(&pollset_set->mu);
+ grpc_pollset_set *pss, grpc_pollset *ps) {
+ /* Nothing to do */
}
static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *bag,
grpc_pollset_set *item) {
- size_t i, j;
- gpr_mu_lock(&bag->mu);
- if (bag->pollset_set_count == bag->pollset_set_capacity) {
- bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
- bag->pollset_sets =
- gpr_realloc(bag->pollset_sets,
- bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
- }
- bag->pollset_sets[bag->pollset_set_count++] = item;
- for (i = 0, j = 0; i < bag->fd_count; i++) {
- if (fd_is_orphaned(bag->fds[i])) {
- GRPC_FD_UNREF(bag->fds[i], "pollset_set");
- } else {
- pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
- bag->fds[j++] = bag->fds[i];
- }
- }
- bag->fd_count = j;
- gpr_mu_unlock(&bag->mu);
+ add_poll_object(exec_ctx, &bag->po, POLL_OBJ_POLLSET_SET, &item->po,
+ POLL_OBJ_POLLSET_SET);
}
static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *bag,
grpc_pollset_set *item) {
- size_t i;
- gpr_mu_lock(&bag->mu);
- for (i = 0; i < bag->pollset_set_count; i++) {
- if (bag->pollset_sets[i] == item) {
- bag->pollset_set_count--;
- GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
- bag->pollset_sets[bag->pollset_set_count]);
- break;
- }
- }
- gpr_mu_unlock(&bag->mu);
+ /* Nothing to do */
}
/* Test helper functions
@@ -1927,9 +1886,9 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
void *grpc_fd_get_polling_island(grpc_fd *fd) {
polling_island *pi;
- gpr_mu_lock(&fd->mu);
- pi = fd->polling_island;
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_lock(&fd->po.mu);
+ pi = fd->po.pi;
+ gpr_mu_unlock(&fd->po.mu);
return pi;
}
@@ -1937,9 +1896,9 @@ void *grpc_fd_get_polling_island(grpc_fd *fd) {
void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
polling_island *pi;
- gpr_mu_lock(&ps->mu);
- pi = ps->polling_island;
- gpr_mu_unlock(&ps->mu);
+ gpr_mu_lock(&ps->po.mu);
+ pi = ps->po.pi;
+ gpr_mu_unlock(&ps->po.mu);
return pi;
}
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index 379bf9bd23..213d29600c 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -144,6 +144,12 @@ struct grpc_resource_quota {
/* Closure around rq_reclamation_done */
grpc_closure rq_reclamation_done_closure;
+ /* This is only really usable for debugging: it's always a stale pointer, but
+ a stale pointer that might just be fresh enough to guide us to where the
+ reclamation system is stuck */
+ grpc_closure *debug_only_last_initiated_reclaimer;
+ grpc_resource_user *debug_only_last_reclaimer_resource_user;
+
/* Roots of all resource user lists */
grpc_resource_user *roots[GRPC_RULIST_COUNT];
@@ -225,6 +231,7 @@ static void rulist_remove(grpc_resource_user *resource_user, grpc_rulist list) {
resource_user->links[list].prev;
resource_user->links[list].prev->links[list].next =
resource_user->links[list].next;
+ resource_user->links[list].next = resource_user->links[list].prev = NULL;
}
/*******************************************************************************
@@ -340,6 +347,9 @@ static bool rq_reclaim(grpc_exec_ctx *exec_ctx,
resource_quota->reclaiming = true;
grpc_resource_quota_internal_ref(resource_quota);
grpc_closure *c = resource_user->reclaimers[destructive];
+ GPR_ASSERT(c);
+ resource_quota->debug_only_last_reclaimer_resource_user = resource_user;
+ resource_quota->debug_only_last_initiated_reclaimer = c;
resource_user->reclaimers[destructive] = NULL;
grpc_closure_run(exec_ctx, c, GRPC_ERROR_NONE);
return true;
@@ -476,6 +486,8 @@ static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
GRPC_ERROR_CANCELLED, NULL);
resource_user->reclaimers[0] = NULL;
resource_user->reclaimers[1] = NULL;
+ rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
+ rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
}
static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
diff --git a/src/core/lib/iomgr/socket_windows.c b/src/core/lib/iomgr/socket_windows.c
index 35f23300dc..54911e0e31 100644
--- a/src/core/lib/iomgr/socket_windows.c
+++ b/src/core/lib/iomgr/socket_windows.c
@@ -76,6 +76,14 @@ void grpc_winsocket_shutdown(grpc_winsocket *winsocket) {
LPFN_DISCONNECTEX DisconnectEx;
DWORD ioctl_num_bytes;
+ gpr_mu_lock(&winsocket->state_mu);
+ if (winsocket->shutdown_called) {
+ gpr_mu_unlock(&winsocket->state_mu);
+ return;
+ }
+ winsocket->shutdown_called = true;
+ gpr_mu_unlock(&winsocket->state_mu);
+
status = WSAIoctl(winsocket->socket, SIO_GET_EXTENSION_FUNCTION_POINTER,
&guid, sizeof(guid), &DisconnectEx, sizeof(DisconnectEx),
&ioctl_num_bytes, NULL, NULL);
diff --git a/src/core/lib/iomgr/socket_windows.h b/src/core/lib/iomgr/socket_windows.h
index 490d0e0a06..a3875ce16c 100644
--- a/src/core/lib/iomgr/socket_windows.h
+++ b/src/core/lib/iomgr/socket_windows.h
@@ -87,6 +87,7 @@ typedef struct grpc_winsocket {
grpc_winsocket_callback_info read_info;
gpr_mu state_mu;
+ bool shutdown_called;
/* You can't add the same socket twice to the same IO Completion Port.
This prevents that. */
diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c
index 4d1e809872..1127588ebc 100644
--- a/src/core/lib/iomgr/tcp_client_windows.c
+++ b/src/core/lib/iomgr/tcp_client_windows.c
@@ -107,18 +107,22 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
gpr_mu_lock(&ac->mu);
- if (error == GRPC_ERROR_NONE && socket != NULL) {
- DWORD transfered_bytes = 0;
- DWORD flags;
- BOOL wsa_success =
- WSAGetOverlappedResult(socket->socket, &socket->write_info.overlapped,
- &transfered_bytes, FALSE, &flags);
- GPR_ASSERT(transfered_bytes == 0);
- if (!wsa_success) {
- error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx");
+ if (error == GRPC_ERROR_NONE) {
+ if (socket != NULL) {
+ DWORD transfered_bytes = 0;
+ DWORD flags;
+ BOOL wsa_success =
+ WSAGetOverlappedResult(socket->socket, &socket->write_info.overlapped,
+ &transfered_bytes, FALSE, &flags);
+ GPR_ASSERT(transfered_bytes == 0);
+ if (!wsa_success) {
+ error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx");
+ } else {
+ *ep = grpc_tcp_create(socket, ac->resource_quota, ac->addr_name);
+ socket = NULL;
+ }
} else {
- *ep = grpc_tcp_create(socket, ac->resource_quota, ac->addr_name);
- socket = NULL;
+ error = GRPC_ERROR_CREATE("socket is null");
}
}
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 12a4797e6f..540305e4fa 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -107,6 +107,12 @@ typedef struct {
grpc_resource_user_slice_allocator slice_allocator;
} grpc_tcp;
+static grpc_error *tcp_annotate_error(grpc_error *src_error, grpc_tcp *tcp) {
+ return grpc_error_set_str(
+ grpc_error_set_int(src_error, GRPC_ERROR_INT_FD, tcp->fd),
+ GRPC_ERROR_STR_TARGET_ADDRESS, tcp->peer_string);
+}
+
static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error *error);
static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
@@ -230,13 +236,15 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- call_read_cb(exec_ctx, tcp, GRPC_OS_ERROR(errno, "recvmsg"));
+ call_read_cb(exec_ctx, tcp,
+ tcp_annotate_error(GRPC_OS_ERROR(errno, "recvmsg"), tcp));
TCP_UNREF(exec_ctx, tcp, "read");
}
} else if (read_bytes == 0) {
/* 0 read size ==> end of stream */
grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- call_read_cb(exec_ctx, tcp, GRPC_ERROR_CREATE("Socket closed"));
+ call_read_cb(exec_ctx, tcp,
+ tcp_annotate_error(GRPC_ERROR_CREATE("Socket closed"), tcp));
TCP_UNREF(exec_ctx, tcp, "read");
} else {
GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
@@ -365,8 +373,13 @@ static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
tcp->outgoing_slice_idx = unwind_slice_idx;
tcp->outgoing_byte_idx = unwind_byte_idx;
return false;
+ } else if (errno == EPIPE) {
+ *error = grpc_error_set_int(GRPC_OS_ERROR(errno, "sendmsg"),
+ GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_UNAVAILABLE);
+ return true;
} else {
- *error = GRPC_OS_ERROR(errno, "sendmsg");
+ *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp);
return true;
}
}
@@ -447,9 +460,10 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (buf->length == 0) {
GPR_TIMER_END("tcp_write", 0);
- grpc_exec_ctx_sched(exec_ctx, cb, grpc_fd_is_shutdown(tcp->em_fd)
- ? GRPC_ERROR_CREATE("EOF")
- : GRPC_ERROR_NONE,
+ grpc_exec_ctx_sched(exec_ctx, cb,
+ grpc_fd_is_shutdown(tcp->em_fd)
+ ? tcp_annotate_error(GRPC_ERROR_CREATE("EOF"), tcp)
+ : GRPC_ERROR_NONE,
NULL);
return;
}
diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h
index 6eba8c4057..437a94beff 100644
--- a/src/core/lib/iomgr/tcp_server.h
+++ b/src/core/lib/iomgr/tcp_server.h
@@ -52,7 +52,8 @@ typedef struct grpc_tcp_server_acceptor {
unsigned fd_index;
} grpc_tcp_server_acceptor;
-/* Called for newly connected TCP connections. */
+/* Called for newly connected TCP connections.
+ Takes ownership of acceptor. */
typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *ep,
grpc_pollset *accepting_pollset,
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 7e2fb0f1f9..179f47ef76 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -381,16 +381,12 @@ error:
/* event manager callback when reads are ready */
static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_tcp_listener *sp = arg;
- grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index,
- sp->fd_index};
- grpc_pollset *read_notifier_pollset = NULL;
- grpc_fd *fdobj;
if (err != GRPC_ERROR_NONE) {
goto error;
}
- read_notifier_pollset =
+ grpc_pollset *read_notifier_pollset =
sp->server->pollsets[(size_t)gpr_atm_no_barrier_fetch_add(
&sp->server->next_pollset_to_assign, 1) %
sp->server->pollset_count];
@@ -426,7 +422,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
gpr_log(GPR_DEBUG, "SERVER_CONNECT: incoming connection: %s", addr_str);
}
- fdobj = grpc_fd_create(fd, name);
+ grpc_fd *fdobj = grpc_fd_create(fd, name);
if (read_notifier_pollset == NULL) {
gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd");
@@ -435,11 +431,17 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj);
+ // Create acceptor.
+ grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor));
+ acceptor->from_server = sp->server;
+ acceptor->port_index = sp->port_index;
+ acceptor->fd_index = sp->fd_index;
+
sp->server->on_accept_cb(
exec_ctx, sp->server->on_accept_cb_arg,
grpc_tcp_create(fdobj, sp->server->resource_quota,
GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
- read_notifier_pollset, &acceptor);
+ read_notifier_pollset, acceptor);
gpr_free(name);
gpr_free(addr_str);
diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c
index b5b9b92a20..e1a174cfa2 100644
--- a/src/core/lib/iomgr/tcp_server_uv.c
+++ b/src/core/lib/iomgr/tcp_server_uv.c
@@ -188,7 +188,6 @@ static void accepted_connection_close_cb(uv_handle_t *handle) {
static void on_connect(uv_stream_t *server, int status) {
grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data;
- grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0};
uv_tcp_t *client;
grpc_endpoint *ep = NULL;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -201,6 +200,7 @@ static void on_connect(uv_stream_t *server, int status) {
uv_strerror(status));
return;
}
+
client = gpr_malloc(sizeof(uv_tcp_t));
uv_tcp_init(uv_default_loop(), client);
// UV documentation says this is guaranteed to succeed
@@ -220,8 +220,13 @@ static void on_connect(uv_stream_t *server, int status) {
gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status));
}
ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string);
+ // Create acceptor.
+ grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor));
+ acceptor->from_server = sp->server;
+ acceptor->port_index = sp->port_index;
+ acceptor->fd_index = 0;
sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
- &acceptor);
+ acceptor);
grpc_exec_ctx_finish(&exec_ctx);
}
}
diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c
index ae54c70d2d..b0c8586bac 100644
--- a/src/core/lib/iomgr/tcp_server_windows.c
+++ b/src/core/lib/iomgr/tcp_server_windows.c
@@ -73,6 +73,7 @@ struct grpc_tcp_listener {
/* The cached AcceptEx for that port. */
LPFN_ACCEPTEX AcceptEx;
int shutting_down;
+ int outstanding_calls;
/* closure for socket notification of accept being ready */
grpc_closure on_accept;
/* linked list */
@@ -140,10 +141,9 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_NONE;
}
-static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
- if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
- }
+static void destroy_server(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_tcp_server *s = arg;
/* Now that the accepts have been aborted, we can destroy the sockets.
The IOCP won't get notified on these, so we can flag them as already
@@ -159,6 +159,16 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_free(s);
}
+static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
+ grpc_tcp_server *s) {
+ if (s->shutdown_complete != NULL) {
+ grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
+ }
+
+ grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(destroy_server, s),
+ GRPC_ERROR_NONE, NULL);
+}
+
grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
gpr_ref_non_zero(&s->refs);
return s;
@@ -180,17 +190,14 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
/* First, shutdown all fd's. This will queue abortion calls for all
of the pending accepts due to the normal operation mechanism. */
if (s->active_ports == 0) {
- immediately_done = 1;
- }
- for (sp = s->head; sp; sp = sp->next) {
- sp->shutting_down = 1;
- grpc_winsocket_shutdown(sp->socket);
+ finish_shutdown_locked(exec_ctx, s);
+ } else {
+ for (sp = s->head; sp; sp = sp->next) {
+ sp->shutting_down = 1;
+ grpc_winsocket_shutdown(sp->socket);
+ }
}
gpr_mu_unlock(&s->mu);
-
- if (immediately_done) {
- finish_shutdown(exec_ctx, s);
- }
}
void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
@@ -251,31 +258,30 @@ failure:
return error;
}
-static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx,
- grpc_tcp_listener *sp) {
+static void decrement_active_ports_and_notify_locked(grpc_exec_ctx *exec_ctx,
+ grpc_tcp_listener *sp) {
int notify = 0;
sp->shutting_down = 0;
- gpr_mu_lock(&sp->server->mu);
GPR_ASSERT(sp->server->active_ports > 0);
if (0 == --sp->server->active_ports) {
- notify = 1;
- }
- gpr_mu_unlock(&sp->server->mu);
- if (notify) {
- finish_shutdown(exec_ctx, sp->server);
+ finish_shutdown_locked(exec_ctx, sp->server);
}
}
/* In order to do an async accept, we need to create a socket first which
will be the one assigned to the new incoming connection. */
-static grpc_error *start_accept(grpc_exec_ctx *exec_ctx,
- grpc_tcp_listener *port) {
+static grpc_error *start_accept_locked(grpc_exec_ctx *exec_ctx,
+ grpc_tcp_listener *port) {
SOCKET sock = INVALID_SOCKET;
BOOL success;
DWORD addrlen = sizeof(struct sockaddr_in6) + 16;
DWORD bytes_received = 0;
grpc_error *error = GRPC_ERROR_NONE;
+ if (port->shutting_down) {
+ return GRPC_ERROR_NONE;
+ }
+
sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
WSA_FLAG_OVERLAPPED);
if (sock == INVALID_SOCKET) {
@@ -305,20 +311,11 @@ static grpc_error *start_accept(grpc_exec_ctx *exec_ctx,
immediately process an accept that happened in the meantime. */
port->new_socket = sock;
grpc_socket_notify_on_read(exec_ctx, port->socket, &port->on_accept);
+ port->outstanding_calls++;
return error;
failure:
GPR_ASSERT(error != GRPC_ERROR_NONE);
- if (port->shutting_down) {
- /* We are abandoning the listener port, take that into account to prevent
- occasional hangs on shutdown. The hang happens when sp->shutting_down
- change is not seen by on_accept and we proceed to trying new accept,
- but we fail there because the listening port has been closed in the
- meantime. */
- decrement_active_ports_and_notify(exec_ctx, port);
- GRPC_ERROR_UNREF(error);
- return GRPC_ERROR_NONE;
- }
if (sock != INVALID_SOCKET) closesocket(sock);
return error;
}
@@ -326,7 +323,6 @@ failure:
/* Event manager callback when reads are ready. */
static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_tcp_listener *sp = arg;
- grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0};
SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL;
@@ -338,6 +334,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
BOOL wsa_success;
int err;
+ gpr_mu_lock(&sp->server->mu);
+
peer_name.len = sizeof(struct sockaddr_storage);
/* The general mechanism for shutting down is to queue abortion calls. While
@@ -347,6 +345,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
const char *msg = grpc_error_string(error);
gpr_log(GPR_INFO, "Skipping on_accept due to error: %s", msg);
grpc_error_free_string(msg);
+ gpr_mu_unlock(&sp->server->mu);
return;
}
@@ -356,17 +355,12 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
&transfered_bytes, FALSE, &flags);
if (!wsa_success) {
- if (sp->shutting_down) {
- /* During the shutdown case, we ARE expecting an error. So that's well,
- and we can wake up the shutdown thread. */
- decrement_active_ports_and_notify(exec_ctx, sp);
- return;
- } else {
+ if (!sp->shutting_down) {
char *utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message);
gpr_free(utf8_message);
- closesocket(sock);
}
+ closesocket(sock);
} else {
if (!sp->shutting_down) {
peer_name_string = NULL;
@@ -401,14 +395,24 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
/* The only time we should call our callback, is where we successfully
managed to accept a connection, and created an endpoint. */
if (ep) {
+ // Create acceptor.
+ grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor));
+ acceptor->from_server = sp->server;
+ acceptor->port_index = sp->port_index;
+ acceptor->fd_index = 0;
sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
- &acceptor);
+ acceptor);
}
/* As we were notified from the IOCP of one and exactly one accept,
the former socked we created has now either been destroy or assigned
to the new connection. We need to create a new one for the next
connection. */
- GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept(exec_ctx, sp)));
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(exec_ctx, sp)));
+ if (0 == --sp->outstanding_calls) {
+ decrement_active_ports_and_notify_locked(exec_ctx, sp);
+ }
+ gpr_mu_unlock(&sp->server->mu);
}
static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
@@ -456,6 +460,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
sp->server = s;
sp->socket = grpc_winsocket_create(sock, "listener");
sp->shutting_down = 0;
+ sp->outstanding_calls = 0;
sp->AcceptEx = AcceptEx;
sp->new_socket = INVALID_SOCKET;
sp->port = port;
@@ -553,7 +558,8 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
s->on_accept_cb = on_accept_cb;
s->on_accept_cb_arg = on_accept_cb_arg;
for (sp = s->head; sp; sp = sp->next) {
- GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept(exec_ctx, sp)));
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(exec_ctx, sp)));
s->active_ports++;
}
gpr_mu_unlock(&s->mu);
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index fd0c7a0f9d..3c24ea9afa 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -388,7 +388,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s,
/* Try listening on IPv6 first. */
addr = &wild6;
// TODO(rjshade): Test and propagate the returned grpc_error*:
- grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd);
+ GRPC_ERROR_UNREF(grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP,
+ &dsmode, &fd));
allocated_port1 = add_socket_to_server(s, fd, addr, read_cb, orphan_cb);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
@@ -402,7 +403,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s,
}
// TODO(rjshade): Test and propagate the returned grpc_error*:
- grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd);
+ GRPC_ERROR_UNREF(grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP,
+ &dsmode, &fd));
if (fd < 0) {
gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
}
diff --git a/src/core/lib/security/transport/handshake.c b/src/core/lib/security/transport/handshake.c
deleted file mode 100644
index 9623797610..0000000000
--- a/src/core/lib/security/transport/handshake.c
+++ /dev/null
@@ -1,374 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/lib/security/transport/handshake.h"
-
-#include <stdbool.h>
-#include <string.h>
-
-#include <grpc/slice_buffer.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include "src/core/lib/iomgr/timer.h"
-#include "src/core/lib/security/context/security_context.h"
-#include "src/core/lib/security/transport/secure_endpoint.h"
-#include "src/core/lib/security/transport/tsi_error.h"
-
-#define GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE 256
-
-typedef struct {
- grpc_security_connector *connector;
- tsi_handshaker *handshaker;
- bool is_client_side;
- unsigned char *handshake_buffer;
- size_t handshake_buffer_size;
- grpc_endpoint *wrapped_endpoint;
- grpc_endpoint *secure_endpoint;
- grpc_slice_buffer left_overs;
- grpc_slice_buffer incoming;
- grpc_slice_buffer outgoing;
- grpc_security_handshake_done_cb cb;
- void *user_data;
- grpc_closure on_handshake_data_sent_to_peer;
- grpc_closure on_handshake_data_received_from_peer;
- grpc_auth_context *auth_context;
- grpc_timer timer;
- gpr_refcount refs;
-} grpc_security_handshake;
-
-static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
- void *setup,
- grpc_error *error);
-
-static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *setup,
- grpc_error *error);
-
-static void security_connector_remove_handshake(grpc_security_handshake *h) {
- GPR_ASSERT(!h->is_client_side);
- grpc_security_connector_handshake_list *node;
- grpc_security_connector_handshake_list *tmp;
- grpc_server_security_connector *sc =
- (grpc_server_security_connector *)h->connector;
- gpr_mu_lock(&sc->mu);
- node = sc->handshaking_handshakes;
- if (node && node->handshake == h) {
- sc->handshaking_handshakes = node->next;
- gpr_free(node);
- gpr_mu_unlock(&sc->mu);
- return;
- }
- while (node) {
- if (node->next->handshake == h) {
- tmp = node->next;
- node->next = node->next->next;
- gpr_free(tmp);
- gpr_mu_unlock(&sc->mu);
- return;
- }
- node = node->next;
- }
- gpr_mu_unlock(&sc->mu);
-}
-
-static void unref_handshake(grpc_security_handshake *h) {
- if (gpr_unref(&h->refs)) {
- if (h->handshaker != NULL) tsi_handshaker_destroy(h->handshaker);
- if (h->handshake_buffer != NULL) gpr_free(h->handshake_buffer);
- grpc_slice_buffer_destroy(&h->left_overs);
- grpc_slice_buffer_destroy(&h->outgoing);
- grpc_slice_buffer_destroy(&h->incoming);
- GRPC_AUTH_CONTEXT_UNREF(h->auth_context, "handshake");
- GRPC_SECURITY_CONNECTOR_UNREF(h->connector, "handshake");
- gpr_free(h);
- }
-}
-
-static void security_handshake_done(grpc_exec_ctx *exec_ctx,
- grpc_security_handshake *h,
- grpc_error *error) {
- grpc_timer_cancel(exec_ctx, &h->timer);
- if (!h->is_client_side) {
- security_connector_remove_handshake(h);
- }
- if (error == GRPC_ERROR_NONE) {
- h->cb(exec_ctx, h->user_data, GRPC_SECURITY_OK, h->secure_endpoint,
- h->auth_context);
- } else {
- const char *msg = grpc_error_string(error);
- gpr_log(GPR_DEBUG, "Security handshake failed: %s", msg);
- grpc_error_free_string(msg);
-
- if (h->secure_endpoint != NULL) {
- grpc_endpoint_shutdown(exec_ctx, h->secure_endpoint);
- grpc_endpoint_destroy(exec_ctx, h->secure_endpoint);
- } else {
- grpc_endpoint_destroy(exec_ctx, h->wrapped_endpoint);
- }
- h->cb(exec_ctx, h->user_data, GRPC_SECURITY_ERROR, NULL, NULL);
- }
- unref_handshake(h);
- GRPC_ERROR_UNREF(error);
-}
-
-static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_security_status status,
- grpc_auth_context *auth_context) {
- grpc_security_handshake *h = user_data;
- tsi_frame_protector *protector;
- tsi_result result;
- if (status != GRPC_SECURITY_OK) {
- security_handshake_done(
- exec_ctx, h,
- grpc_error_set_int(GRPC_ERROR_CREATE("Error checking peer."),
- GRPC_ERROR_INT_SECURITY_STATUS, status));
- return;
- }
- h->auth_context = GRPC_AUTH_CONTEXT_REF(auth_context, "handshake");
- result =
- tsi_handshaker_create_frame_protector(h->handshaker, NULL, &protector);
- if (result != TSI_OK) {
- security_handshake_done(
- exec_ctx, h,
- grpc_set_tsi_error_result(
- GRPC_ERROR_CREATE("Frame protector creation failed"), result));
- return;
- }
- h->secure_endpoint =
- grpc_secure_endpoint_create(protector, h->wrapped_endpoint,
- h->left_overs.slices, h->left_overs.count);
- h->left_overs.count = 0;
- h->left_overs.length = 0;
- security_handshake_done(exec_ctx, h, GRPC_ERROR_NONE);
- return;
-}
-
-static void check_peer(grpc_exec_ctx *exec_ctx, grpc_security_handshake *h) {
- tsi_peer peer;
- tsi_result result = tsi_handshaker_extract_peer(h->handshaker, &peer);
-
- if (result != TSI_OK) {
- security_handshake_done(
- exec_ctx, h, grpc_set_tsi_error_result(
- GRPC_ERROR_CREATE("Peer extraction failed"), result));
- return;
- }
- grpc_security_connector_check_peer(exec_ctx, h->connector, peer,
- on_peer_checked, h);
-}
-
-static void send_handshake_bytes_to_peer(grpc_exec_ctx *exec_ctx,
- grpc_security_handshake *h) {
- size_t offset = 0;
- tsi_result result = TSI_OK;
- grpc_slice to_send;
-
- do {
- size_t to_send_size = h->handshake_buffer_size - offset;
- result = tsi_handshaker_get_bytes_to_send_to_peer(
- h->handshaker, h->handshake_buffer + offset, &to_send_size);
- offset += to_send_size;
- if (result == TSI_INCOMPLETE_DATA) {
- h->handshake_buffer_size *= 2;
- h->handshake_buffer =
- gpr_realloc(h->handshake_buffer, h->handshake_buffer_size);
- }
- } while (result == TSI_INCOMPLETE_DATA);
-
- if (result != TSI_OK) {
- security_handshake_done(exec_ctx, h,
- grpc_set_tsi_error_result(
- GRPC_ERROR_CREATE("Handshake failed"), result));
- return;
- }
-
- to_send =
- grpc_slice_from_copied_buffer((const char *)h->handshake_buffer, offset);
- grpc_slice_buffer_reset_and_unref(&h->outgoing);
- grpc_slice_buffer_add(&h->outgoing, to_send);
- /* TODO(klempner,jboeuf): This should probably use the client setup
- deadline */
- grpc_endpoint_write(exec_ctx, h->wrapped_endpoint, &h->outgoing,
- &h->on_handshake_data_sent_to_peer);
-}
-
-static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
- void *handshake,
- grpc_error *error) {
- grpc_security_handshake *h = handshake;
- size_t consumed_slice_size = 0;
- tsi_result result = TSI_OK;
- size_t i;
- size_t num_left_overs;
- int has_left_overs_in_current_slice = 0;
-
- if (error != GRPC_ERROR_NONE) {
- security_handshake_done(
- exec_ctx, h,
- GRPC_ERROR_CREATE_REFERENCING("Handshake read failed", &error, 1));
- return;
- }
-
- for (i = 0; i < h->incoming.count; i++) {
- consumed_slice_size = GRPC_SLICE_LENGTH(h->incoming.slices[i]);
- result = tsi_handshaker_process_bytes_from_peer(
- h->handshaker, GRPC_SLICE_START_PTR(h->incoming.slices[i]),
- &consumed_slice_size);
- if (!tsi_handshaker_is_in_progress(h->handshaker)) break;
- }
-
- if (tsi_handshaker_is_in_progress(h->handshaker)) {
- /* We may need more data. */
- if (result == TSI_INCOMPLETE_DATA) {
- grpc_endpoint_read(exec_ctx, h->wrapped_endpoint, &h->incoming,
- &h->on_handshake_data_received_from_peer);
- return;
- } else {
- send_handshake_bytes_to_peer(exec_ctx, h);
- return;
- }
- }
-
- if (result != TSI_OK) {
- security_handshake_done(exec_ctx, h,
- grpc_set_tsi_error_result(
- GRPC_ERROR_CREATE("Handshake failed"), result));
- return;
- }
-
- /* Handshake is done and successful this point. */
- has_left_overs_in_current_slice =
- (consumed_slice_size < GRPC_SLICE_LENGTH(h->incoming.slices[i]));
- num_left_overs =
- (has_left_overs_in_current_slice ? 1 : 0) + h->incoming.count - i - 1;
- if (num_left_overs == 0) {
- check_peer(exec_ctx, h);
- return;
- }
-
- /* Put the leftovers in our buffer (ownership transfered). */
- if (has_left_overs_in_current_slice) {
- grpc_slice_buffer_add(
- &h->left_overs,
- grpc_slice_split_tail(&h->incoming.slices[i], consumed_slice_size));
- grpc_slice_unref(
- h->incoming.slices[i]); /* split_tail above increments refcount. */
- }
- grpc_slice_buffer_addn(
- &h->left_overs, &h->incoming.slices[i + 1],
- num_left_overs - (size_t)has_left_overs_in_current_slice);
- check_peer(exec_ctx, h);
-}
-
-/* If handshake is NULL, the handshake is done. */
-static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx,
- void *handshake, grpc_error *error) {
- grpc_security_handshake *h = handshake;
-
- /* Make sure that write is OK. */
- if (error != GRPC_ERROR_NONE) {
- if (handshake != NULL)
- security_handshake_done(
- exec_ctx, h,
- GRPC_ERROR_CREATE_REFERENCING("Handshake write failed", &error, 1));
- return;
- }
-
- /* We may be done. */
- if (tsi_handshaker_is_in_progress(h->handshaker)) {
- /* TODO(klempner,jboeuf): This should probably use the client setup
- deadline */
- grpc_endpoint_read(exec_ctx, h->wrapped_endpoint, &h->incoming,
- &h->on_handshake_data_received_from_peer);
- } else {
- check_peer(exec_ctx, h);
- }
-}
-
-static void on_timeout(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- grpc_security_handshake *h = arg;
- if (error == GRPC_ERROR_NONE) {
- grpc_endpoint_shutdown(exec_ctx, h->wrapped_endpoint);
- }
- unref_handshake(h);
-}
-
-void grpc_do_security_handshake(
- grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker,
- grpc_security_connector *connector, bool is_client_side,
- grpc_endpoint *nonsecure_endpoint, grpc_slice_buffer *read_buffer,
- gpr_timespec deadline, grpc_security_handshake_done_cb cb,
- void *user_data) {
- grpc_security_connector_handshake_list *handshake_node;
- grpc_security_handshake *h = gpr_malloc(sizeof(grpc_security_handshake));
- memset(h, 0, sizeof(grpc_security_handshake));
- h->handshaker = handshaker;
- h->connector = GRPC_SECURITY_CONNECTOR_REF(connector, "handshake");
- h->is_client_side = is_client_side;
- h->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE;
- h->handshake_buffer = gpr_malloc(h->handshake_buffer_size);
- h->wrapped_endpoint = nonsecure_endpoint;
- h->user_data = user_data;
- h->cb = cb;
- gpr_ref_init(&h->refs, 2); /* timer and handshake proper each get a ref */
- grpc_closure_init(&h->on_handshake_data_sent_to_peer,
- on_handshake_data_sent_to_peer, h);
- grpc_closure_init(&h->on_handshake_data_received_from_peer,
- on_handshake_data_received_from_peer, h);
- grpc_slice_buffer_init(&h->left_overs);
- grpc_slice_buffer_init(&h->outgoing);
- grpc_slice_buffer_init(&h->incoming);
- if (read_buffer != NULL) {
- grpc_slice_buffer_move_into(read_buffer, &h->incoming);
- gpr_free(read_buffer);
- }
- if (!is_client_side) {
- grpc_server_security_connector *server_connector =
- (grpc_server_security_connector *)connector;
- handshake_node = gpr_malloc(sizeof(grpc_security_connector_handshake_list));
- handshake_node->handshake = h;
- gpr_mu_lock(&server_connector->mu);
- handshake_node->next = server_connector->handshaking_handshakes;
- server_connector->handshaking_handshakes = handshake_node;
- gpr_mu_unlock(&server_connector->mu);
- }
- send_handshake_bytes_to_peer(exec_ctx, h);
- grpc_timer_init(exec_ctx, &h->timer,
- gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
- on_timeout, h, gpr_now(GPR_CLOCK_MONOTONIC));
-}
-
-void grpc_security_handshake_shutdown(grpc_exec_ctx *exec_ctx,
- void *handshake) {
- grpc_security_handshake *h = handshake;
- grpc_endpoint_shutdown(exec_ctx, h->wrapped_endpoint);
-}
diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c
index 0fbd63a7e1..5b088aa58d 100644
--- a/src/core/lib/security/transport/security_connector.c
+++ b/src/core/lib/security/transport/security_connector.c
@@ -43,11 +43,12 @@
#include <grpc/support/string_util.h>
#include "src/core/ext/transport/chttp2/alpn/alpn.h"
+#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/iomgr/load_file.h"
#include "src/core/lib/security/context/security_context.h"
#include "src/core/lib/security/credentials/credentials.h"
-#include "src/core/lib/security/transport/handshake.h"
#include "src/core/lib/security/transport/secure_endpoint.h"
+#include "src/core/lib/security/transport/security_handshaker.h"
#include "src/core/lib/support/env.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/tsi/fake_transport_security.h"
@@ -111,58 +112,34 @@ const tsi_peer_property *tsi_peer_get_property_by_name(const tsi_peer *peer,
return NULL;
}
-void grpc_server_security_connector_shutdown(
- grpc_exec_ctx *exec_ctx, grpc_server_security_connector *connector) {
- grpc_security_connector_handshake_list *tmp;
- gpr_mu_lock(&connector->mu);
- while (connector->handshaking_handshakes) {
- tmp = connector->handshaking_handshakes;
- grpc_security_handshake_shutdown(
- exec_ctx, connector->handshaking_handshakes->handshake);
- connector->handshaking_handshakes = tmp->next;
- gpr_free(tmp);
+void grpc_channel_security_connector_add_handshakers(
+ grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *connector,
+ grpc_handshake_manager *handshake_mgr) {
+ if (connector != NULL) {
+ connector->add_handshakers(exec_ctx, connector, handshake_mgr);
}
- gpr_mu_unlock(&connector->mu);
}
-void grpc_channel_security_connector_do_handshake(
- grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc,
- grpc_endpoint *nonsecure_endpoint, grpc_slice_buffer *read_buffer,
- gpr_timespec deadline, grpc_security_handshake_done_cb cb,
- void *user_data) {
- if (sc == NULL || nonsecure_endpoint == NULL) {
- gpr_free(read_buffer);
- cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL);
- } else {
- sc->do_handshake(exec_ctx, sc, nonsecure_endpoint, read_buffer, deadline,
- cb, user_data);
- }
-}
-
-void grpc_server_security_connector_do_handshake(
- grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc,
- grpc_tcp_server_acceptor *acceptor, grpc_endpoint *nonsecure_endpoint,
- grpc_slice_buffer *read_buffer, gpr_timespec deadline,
- grpc_security_handshake_done_cb cb, void *user_data) {
- if (sc == NULL || nonsecure_endpoint == NULL) {
- gpr_free(read_buffer);
- cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL);
- } else {
- sc->do_handshake(exec_ctx, sc, acceptor, nonsecure_endpoint, read_buffer,
- deadline, cb, user_data);
+void grpc_server_security_connector_add_handshakers(
+ grpc_exec_ctx *exec_ctx, grpc_server_security_connector *connector,
+ grpc_handshake_manager *handshake_mgr) {
+ if (connector != NULL) {
+ connector->add_handshakers(exec_ctx, connector, handshake_mgr);
}
}
void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc,
tsi_peer peer,
- grpc_security_peer_check_cb cb,
- void *user_data) {
+ grpc_auth_context **auth_context,
+ grpc_closure *on_peer_checked) {
if (sc == NULL) {
- cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL);
+ grpc_exec_ctx_sched(
+ exec_ctx, on_peer_checked,
+ GRPC_ERROR_CREATE("cannot check peer -- no security connector"), NULL);
tsi_peer_destruct(&peer);
} else {
- sc->vtable->check_peer(exec_ctx, sc, peer, cb, user_data);
+ sc->vtable->check_peer(exec_ctx, sc, peer, auth_context, on_peer_checked);
}
}
@@ -262,45 +239,41 @@ static void fake_channel_destroy(grpc_security_connector *sc) {
gpr_free(sc);
}
-static void fake_server_destroy(grpc_security_connector *sc) {
- grpc_server_security_connector *c = (grpc_server_security_connector *)sc;
- gpr_mu_destroy(&c->mu);
- gpr_free(sc);
-}
+static void fake_server_destroy(grpc_security_connector *sc) { gpr_free(sc); }
static void fake_check_peer(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc, tsi_peer peer,
- grpc_security_peer_check_cb cb, void *user_data) {
+ grpc_auth_context **auth_context,
+ grpc_closure *on_peer_checked) {
const char *prop_name;
- grpc_security_status status = GRPC_SECURITY_OK;
- grpc_auth_context *auth_context = NULL;
+ grpc_error *error = GRPC_ERROR_NONE;
+ *auth_context = NULL;
if (peer.property_count != 1) {
- gpr_log(GPR_ERROR, "Fake peers should only have 1 property.");
- status = GRPC_SECURITY_ERROR;
+ error = GRPC_ERROR_CREATE("Fake peers should only have 1 property.");
goto end;
}
prop_name = peer.properties[0].name;
if (prop_name == NULL ||
strcmp(prop_name, TSI_CERTIFICATE_TYPE_PEER_PROPERTY)) {
- gpr_log(GPR_ERROR, "Unexpected property in fake peer: %s.",
- prop_name == NULL ? "<EMPTY>" : prop_name);
- status = GRPC_SECURITY_ERROR;
+ char *msg;
+ gpr_asprintf(&msg, "Unexpected property in fake peer: %s.",
+ prop_name == NULL ? "<EMPTY>" : prop_name);
+ error = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
goto end;
}
if (strncmp(peer.properties[0].value.data, TSI_FAKE_CERTIFICATE_TYPE,
peer.properties[0].value.length)) {
- gpr_log(GPR_ERROR, "Invalid value for cert type property.");
- status = GRPC_SECURITY_ERROR;
+ error = GRPC_ERROR_CREATE("Invalid value for cert type property.");
goto end;
}
- auth_context = grpc_auth_context_create(NULL);
+ *auth_context = grpc_auth_context_create(NULL);
grpc_auth_context_add_cstring_property(
- auth_context, GRPC_TRANSPORT_SECURITY_TYPE_PROPERTY_NAME,
+ *auth_context, GRPC_TRANSPORT_SECURITY_TYPE_PROPERTY_NAME,
GRPC_FAKE_TRANSPORT_SECURITY_TYPE);
end:
- cb(exec_ctx, user_data, status, auth_context);
- grpc_auth_context_unref(auth_context);
+ grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL);
tsi_peer_destruct(&peer);
}
@@ -313,26 +286,24 @@ static void fake_channel_check_call_host(grpc_exec_ctx *exec_ctx,
cb(exec_ctx, user_data, GRPC_SECURITY_OK);
}
-static void fake_channel_do_handshake(grpc_exec_ctx *exec_ctx,
- grpc_channel_security_connector *sc,
- grpc_endpoint *nonsecure_endpoint,
- grpc_slice_buffer *read_buffer,
- gpr_timespec deadline,
- grpc_security_handshake_done_cb cb,
- void *user_data) {
- grpc_do_security_handshake(exec_ctx, tsi_create_fake_handshaker(1), &sc->base,
- true, nonsecure_endpoint, read_buffer, deadline,
- cb, user_data);
+static void fake_channel_add_handshakers(
+ grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc,
+ grpc_handshake_manager *handshake_mgr) {
+ grpc_handshake_manager_add(
+ handshake_mgr,
+ grpc_security_handshaker_create(
+ exec_ctx, tsi_create_fake_handshaker(true /* is_client */),
+ &sc->base));
}
-static void fake_server_do_handshake(
- grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc,
- grpc_tcp_server_acceptor *acceptor, grpc_endpoint *nonsecure_endpoint,
- grpc_slice_buffer *read_buffer, gpr_timespec deadline,
- grpc_security_handshake_done_cb cb, void *user_data) {
- grpc_do_security_handshake(exec_ctx, tsi_create_fake_handshaker(0), &sc->base,
- false, nonsecure_endpoint, read_buffer, deadline,
- cb, user_data);
+static void fake_server_add_handshakers(grpc_exec_ctx *exec_ctx,
+ grpc_server_security_connector *sc,
+ grpc_handshake_manager *handshake_mgr) {
+ grpc_handshake_manager_add(
+ handshake_mgr,
+ grpc_security_handshaker_create(
+ exec_ctx, tsi_create_fake_handshaker(false /* is_client */),
+ &sc->base));
}
static grpc_security_connector_vtable fake_channel_vtable = {
@@ -350,7 +321,7 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create(
c->base.vtable = &fake_channel_vtable;
c->request_metadata_creds = grpc_call_credentials_ref(request_metadata_creds);
c->check_call_host = fake_channel_check_call_host;
- c->do_handshake = fake_channel_do_handshake;
+ c->add_handshakers = fake_channel_add_handshakers;
return c;
}
@@ -362,8 +333,7 @@ grpc_server_security_connector *grpc_fake_server_security_connector_create(
gpr_ref_init(&c->base.refcount, 1);
c->base.vtable = &fake_server_vtable;
c->base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME;
- c->do_handshake = fake_server_do_handshake;
- gpr_mu_init(&c->mu);
+ c->add_handshakers = fake_server_add_handshakers;
return c;
}
@@ -396,11 +366,9 @@ static void ssl_channel_destroy(grpc_security_connector *sc) {
static void ssl_server_destroy(grpc_security_connector *sc) {
grpc_ssl_server_security_connector *c =
(grpc_ssl_server_security_connector *)sc;
-
if (c->handshaker_factory != NULL) {
tsi_ssl_handshaker_factory_destroy(c->handshaker_factory);
}
- gpr_mu_destroy(&c->base.mu);
gpr_free(sc);
}
@@ -419,49 +387,35 @@ static grpc_security_status ssl_create_handshaker(
return GRPC_SECURITY_OK;
}
-static void ssl_channel_do_handshake(grpc_exec_ctx *exec_ctx,
- grpc_channel_security_connector *sc,
- grpc_endpoint *nonsecure_endpoint,
- grpc_slice_buffer *read_buffer,
- gpr_timespec deadline,
- grpc_security_handshake_done_cb cb,
- void *user_data) {
+static void ssl_channel_add_handshakers(grpc_exec_ctx *exec_ctx,
+ grpc_channel_security_connector *sc,
+ grpc_handshake_manager *handshake_mgr) {
grpc_ssl_channel_security_connector *c =
(grpc_ssl_channel_security_connector *)sc;
- tsi_handshaker *handshaker;
- grpc_security_status status = ssl_create_handshaker(
- c->handshaker_factory, true,
- c->overridden_target_name != NULL ? c->overridden_target_name
- : c->target_name,
- &handshaker);
- if (status != GRPC_SECURITY_OK) {
- gpr_free(read_buffer);
- cb(exec_ctx, user_data, status, NULL, NULL);
- } else {
- grpc_do_security_handshake(exec_ctx, handshaker, &sc->base, true,
- nonsecure_endpoint, read_buffer, deadline, cb,
- user_data);
- }
-}
-
-static void ssl_server_do_handshake(
- grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc,
- grpc_tcp_server_acceptor *acceptor, grpc_endpoint *nonsecure_endpoint,
- grpc_slice_buffer *read_buffer, gpr_timespec deadline,
- grpc_security_handshake_done_cb cb, void *user_data) {
+ // Instantiate TSI handshaker.
+ tsi_handshaker *tsi_hs = NULL;
+ ssl_create_handshaker(c->handshaker_factory, true /* is_client */,
+ c->overridden_target_name != NULL
+ ? c->overridden_target_name
+ : c->target_name,
+ &tsi_hs);
+ // Create handshakers.
+ grpc_handshake_manager_add(handshake_mgr, grpc_security_handshaker_create(
+ exec_ctx, tsi_hs, &sc->base));
+}
+
+static void ssl_server_add_handshakers(grpc_exec_ctx *exec_ctx,
+ grpc_server_security_connector *sc,
+ grpc_handshake_manager *handshake_mgr) {
grpc_ssl_server_security_connector *c =
(grpc_ssl_server_security_connector *)sc;
- tsi_handshaker *handshaker;
- grpc_security_status status =
- ssl_create_handshaker(c->handshaker_factory, false, NULL, &handshaker);
- if (status != GRPC_SECURITY_OK) {
- gpr_free(read_buffer);
- cb(exec_ctx, user_data, status, NULL, NULL);
- } else {
- grpc_do_security_handshake(exec_ctx, handshaker, &sc->base, false,
- nonsecure_endpoint, read_buffer, deadline, cb,
- user_data);
- }
+ // Instantiate TSI handshaker.
+ tsi_handshaker *tsi_hs = NULL;
+ ssl_create_handshaker(c->handshaker_factory, false /* is_client */,
+ NULL /* peer_name */, &tsi_hs);
+ // Create handshakers.
+ grpc_handshake_manager_add(handshake_mgr, grpc_security_handshaker_create(
+ exec_ctx, tsi_hs, &sc->base));
}
static int ssl_host_matches_name(const tsi_peer *peer, const char *peer_name) {
@@ -518,57 +472,53 @@ grpc_auth_context *tsi_ssl_peer_to_auth_context(const tsi_peer *peer) {
return ctx;
}
-static grpc_security_status ssl_check_peer(grpc_security_connector *sc,
- const char *peer_name,
- const tsi_peer *peer,
- grpc_auth_context **auth_context) {
+static grpc_error *ssl_check_peer(grpc_security_connector *sc,
+ const char *peer_name, const tsi_peer *peer,
+ grpc_auth_context **auth_context) {
/* Check the ALPN. */
const tsi_peer_property *p =
tsi_peer_get_property_by_name(peer, TSI_SSL_ALPN_SELECTED_PROTOCOL);
if (p == NULL) {
- gpr_log(GPR_ERROR, "Missing selected ALPN property.");
- return GRPC_SECURITY_ERROR;
+ return GRPC_ERROR_CREATE(
+ "Cannot check peer: missing selected ALPN property.");
}
if (!grpc_chttp2_is_alpn_version_supported(p->value.data, p->value.length)) {
- gpr_log(GPR_ERROR, "Invalid ALPN value.");
- return GRPC_SECURITY_ERROR;
+ return GRPC_ERROR_CREATE("Cannot check peer: invalid ALPN value.");
}
/* Check the peer name if specified. */
if (peer_name != NULL && !ssl_host_matches_name(peer, peer_name)) {
- gpr_log(GPR_ERROR, "Peer name %s is not in peer certificate", peer_name);
- return GRPC_SECURITY_ERROR;
+ char *msg;
+ gpr_asprintf(&msg, "Peer name %s is not in peer certificate", peer_name);
+ grpc_error *error = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return error;
}
*auth_context = tsi_ssl_peer_to_auth_context(peer);
- return GRPC_SECURITY_OK;
+ return GRPC_ERROR_NONE;
}
static void ssl_channel_check_peer(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc, tsi_peer peer,
- grpc_security_peer_check_cb cb,
- void *user_data) {
+ grpc_auth_context **auth_context,
+ grpc_closure *on_peer_checked) {
grpc_ssl_channel_security_connector *c =
(grpc_ssl_channel_security_connector *)sc;
- grpc_security_status status;
- grpc_auth_context *auth_context = NULL;
- status = ssl_check_peer(sc, c->overridden_target_name != NULL
- ? c->overridden_target_name
- : c->target_name,
- &peer, &auth_context);
- cb(exec_ctx, user_data, status, auth_context);
- grpc_auth_context_unref(auth_context);
+ grpc_error *error = ssl_check_peer(sc, c->overridden_target_name != NULL
+ ? c->overridden_target_name
+ : c->target_name,
+ &peer, auth_context);
+ grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL);
tsi_peer_destruct(&peer);
}
static void ssl_server_check_peer(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc, tsi_peer peer,
- grpc_security_peer_check_cb cb,
- void *user_data) {
- grpc_auth_context *auth_context = NULL;
- grpc_security_status status = ssl_check_peer(sc, NULL, &peer, &auth_context);
+ grpc_auth_context **auth_context,
+ grpc_closure *on_peer_checked) {
+ grpc_error *error = ssl_check_peer(sc, NULL, &peer, auth_context);
tsi_peer_destruct(&peer);
- cb(exec_ctx, user_data, status, auth_context);
- grpc_auth_context_unref(auth_context);
+ grpc_exec_ctx_sched(exec_ctx, on_peer_checked, error, NULL);
}
static void add_shallow_auth_property_to_peer(tsi_peer *peer,
@@ -765,7 +715,7 @@ grpc_security_status grpc_ssl_channel_security_connector_create(
c->base.request_metadata_creds =
grpc_call_credentials_ref(request_metadata_creds);
c->base.check_call_host = ssl_channel_check_call_host;
- c->base.do_handshake = ssl_channel_do_handshake;
+ c->base.add_handshakers = ssl_channel_add_handshakers;
gpr_split_host_port(target_name, &c->target_name, &port);
gpr_free(port);
if (overridden_target_name != NULL) {
@@ -840,8 +790,7 @@ grpc_security_status grpc_ssl_server_security_connector_create(
*sc = NULL;
goto error;
}
- gpr_mu_init(&c->base.mu);
- c->base.do_handshake = ssl_server_do_handshake;
+ c->base.add_handshakers = ssl_server_add_handshakers;
*sc = &c->base;
gpr_free((void *)alpn_protocol_strings);
gpr_free(alpn_protocol_string_lengths);
diff --git a/src/core/lib/security/transport/security_connector.h b/src/core/lib/security/transport/security_connector.h
index dc02692b01..a84b359051 100644
--- a/src/core/lib/security/transport/security_connector.h
+++ b/src/core/lib/security/transport/security_connector.h
@@ -35,6 +35,8 @@
#define GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_CONNECTOR_H
#include <grpc/grpc_security.h>
+
+#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/tsi/transport_security_interface.h"
@@ -57,21 +59,11 @@ typedef struct grpc_security_connector grpc_security_connector;
#define GRPC_SECURITY_CONNECTOR_ARG "grpc.security_connector"
-typedef void (*grpc_security_peer_check_cb)(grpc_exec_ctx *exec_ctx,
- void *user_data,
- grpc_security_status status,
- grpc_auth_context *auth_context);
-
-/* Ownership of the secure_endpoint is transfered. */
-typedef void (*grpc_security_handshake_done_cb)(
- grpc_exec_ctx *exec_ctx, void *user_data, grpc_security_status status,
- grpc_endpoint *secure_endpoint, grpc_auth_context *auth_context);
-
typedef struct {
void (*destroy)(grpc_security_connector *sc);
void (*check_peer)(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc,
- tsi_peer peer, grpc_security_peer_check_cb cb,
- void *user_data);
+ tsi_peer peer, grpc_auth_context **auth_context,
+ grpc_closure *on_peer_checked);
} grpc_security_connector_vtable;
typedef struct grpc_security_connector_handshake_list {
@@ -106,12 +98,12 @@ void grpc_security_connector_unref(grpc_security_connector *policy);
#endif
/* Check the peer. Callee takes ownership of the peer object.
- The callback will include the resulting auth_context. */
+ When done, sets *auth_context and invokes on_peer_checked. */
void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc,
tsi_peer peer,
- grpc_security_peer_check_cb cb,
- void *user_data);
+ grpc_auth_context **auth_context,
+ grpc_closure *on_peer_checked);
/* Util to encapsulate the connector in a channel arg. */
grpc_arg grpc_security_connector_to_arg(grpc_security_connector *sc);
@@ -141,11 +133,9 @@ struct grpc_channel_security_connector {
grpc_channel_security_connector *sc, const char *host,
grpc_auth_context *auth_context,
grpc_security_call_host_check_cb cb, void *user_data);
- void (*do_handshake)(grpc_exec_ctx *exec_ctx,
- grpc_channel_security_connector *sc,
- grpc_endpoint *nonsecure_endpoint,
- grpc_slice_buffer *read_buffer, gpr_timespec deadline,
- grpc_security_handshake_done_cb cb, void *user_data);
+ void (*add_handshakers)(grpc_exec_ctx *exec_ctx,
+ grpc_channel_security_connector *sc,
+ grpc_handshake_manager *handshake_mgr);
};
/* Checks that the host that will be set for a call is acceptable. */
@@ -154,11 +144,10 @@ void grpc_channel_security_connector_check_call_host(
const char *host, grpc_auth_context *auth_context,
grpc_security_call_host_check_cb cb, void *user_data);
-/* Handshake. */
-void grpc_channel_security_connector_do_handshake(
+/* Registers handshakers with \a handshake_mgr. */
+void grpc_channel_security_connector_add_handshakers(
grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *connector,
- grpc_endpoint *nonsecure_endpoint, grpc_slice_buffer *read_buffer,
- gpr_timespec deadline, grpc_security_handshake_done_cb cb, void *user_data);
+ grpc_handshake_manager *handshake_mgr);
/* --- server_security_connector object. ---
@@ -169,25 +158,14 @@ typedef struct grpc_server_security_connector grpc_server_security_connector;
struct grpc_server_security_connector {
grpc_security_connector base;
- gpr_mu mu;
- grpc_security_connector_handshake_list *handshaking_handshakes;
- const grpc_channel_args *channel_args;
- void (*do_handshake)(grpc_exec_ctx *exec_ctx,
- grpc_server_security_connector *sc,
- grpc_tcp_server_acceptor *acceptor,
- grpc_endpoint *nonsecure_endpoint,
- grpc_slice_buffer *read_buffer, gpr_timespec deadline,
- grpc_security_handshake_done_cb cb, void *user_data);
+ void (*add_handshakers)(grpc_exec_ctx *exec_ctx,
+ grpc_server_security_connector *sc,
+ grpc_handshake_manager *handshake_mgr);
};
-void grpc_server_security_connector_do_handshake(
+void grpc_server_security_connector_add_handshakers(
grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc,
- grpc_tcp_server_acceptor *acceptor, grpc_endpoint *nonsecure_endpoint,
- grpc_slice_buffer *read_buffer, gpr_timespec deadline,
- grpc_security_handshake_done_cb cb, void *user_data);
-
-void grpc_server_security_connector_shutdown(
- grpc_exec_ctx *exec_ctx, grpc_server_security_connector *connector);
+ grpc_handshake_manager *handshake_mgr);
/* --- Creation security connectors. --- */
diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c
new file mode 100644
index 0000000000..41a775db85
--- /dev/null
+++ b/src/core/lib/security/transport/security_handshaker.c
@@ -0,0 +1,450 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/security/transport/security_handshaker.h"
+
+#include <stdbool.h>
+#include <string.h>
+
+#include <grpc/slice_buffer.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/handshaker.h"
+#include "src/core/lib/security/context/security_context.h"
+#include "src/core/lib/security/transport/secure_endpoint.h"
+#include "src/core/lib/security/transport/tsi_error.h"
+
+#define GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE 256
+
+typedef struct {
+ grpc_handshaker base;
+
+ // State set at creation time.
+ tsi_handshaker *handshaker;
+ grpc_security_connector *connector;
+
+ gpr_mu mu;
+ gpr_refcount refs;
+
+ bool shutdown;
+ // Endpoint and read buffer to destroy after a shutdown.
+ grpc_endpoint *endpoint_to_destroy;
+ grpc_slice_buffer *read_buffer_to_destroy;
+
+ // State saved while performing the handshake.
+ grpc_handshaker_args *args;
+ grpc_closure *on_handshake_done;
+
+ unsigned char *handshake_buffer;
+ size_t handshake_buffer_size;
+ grpc_slice_buffer left_overs;
+ grpc_slice_buffer outgoing;
+ grpc_closure on_handshake_data_sent_to_peer;
+ grpc_closure on_handshake_data_received_from_peer;
+ grpc_closure on_peer_checked;
+ grpc_auth_context *auth_context;
+} security_handshaker;
+
+static void security_handshaker_unref(grpc_exec_ctx *exec_ctx,
+ security_handshaker *h) {
+ if (gpr_unref(&h->refs)) {
+ gpr_mu_destroy(&h->mu);
+ tsi_handshaker_destroy(h->handshaker);
+ if (h->endpoint_to_destroy != NULL) {
+ grpc_endpoint_destroy(exec_ctx, h->endpoint_to_destroy);
+ }
+ if (h->read_buffer_to_destroy != NULL) {
+ grpc_slice_buffer_destroy(h->read_buffer_to_destroy);
+ gpr_free(h->read_buffer_to_destroy);
+ }
+ gpr_free(h->handshake_buffer);
+ grpc_slice_buffer_destroy(&h->left_overs);
+ grpc_slice_buffer_destroy(&h->outgoing);
+ GRPC_AUTH_CONTEXT_UNREF(h->auth_context, "handshake");
+ GRPC_SECURITY_CONNECTOR_UNREF(h->connector, "handshake");
+ gpr_free(h);
+ }
+}
+
+// Set args fields to NULL, saving the endpoint and read buffer for
+// later destruction.
+static void cleanup_args_for_failure_locked(security_handshaker *h) {
+ h->endpoint_to_destroy = h->args->endpoint;
+ h->args->endpoint = NULL;
+ h->read_buffer_to_destroy = h->args->read_buffer;
+ h->args->read_buffer = NULL;
+ grpc_channel_args_destroy(h->args->args);
+ h->args->args = NULL;
+}
+
+// If the handshake failed or we're shutting down, clean up and invoke the
+// callback with the error.
+static void security_handshake_failed_locked(grpc_exec_ctx *exec_ctx,
+ security_handshaker *h,
+ grpc_error *error) {
+ if (error == GRPC_ERROR_NONE) {
+ // If we were shut down after the handshake succeeded but before an
+ // endpoint callback was invoked, we need to generate our own error.
+ error = GRPC_ERROR_CREATE("Handshaker shutdown");
+ }
+ const char *msg = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "Security handshake failed: %s", msg);
+ grpc_error_free_string(msg);
+ if (!h->shutdown) {
+ // TODO(ctiller): It is currently necessary to shutdown endpoints
+ // before destroying them, even if we know that there are no
+ // pending read/write callbacks. This should be fixed, at which
+ // point this can be removed.
+ grpc_endpoint_shutdown(exec_ctx, h->args->endpoint);
+ // Not shutting down, so the write failed. Clean up before
+ // invoking the callback.
+ cleanup_args_for_failure_locked(h);
+ // Set shutdown to true so that subsequent calls to
+ // security_handshaker_shutdown() do nothing.
+ h->shutdown = true;
+ }
+ // Invoke callback.
+ grpc_exec_ctx_sched(exec_ctx, h->on_handshake_done, error, NULL);
+}
+
+static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ security_handshaker *h = arg;
+ gpr_mu_lock(&h->mu);
+ if (error != GRPC_ERROR_NONE || h->shutdown) {
+ security_handshake_failed_locked(exec_ctx, h, GRPC_ERROR_REF(error));
+ goto done;
+ }
+ // Get frame protector.
+ tsi_frame_protector *protector;
+ tsi_result result =
+ tsi_handshaker_create_frame_protector(h->handshaker, NULL, &protector);
+ if (result != TSI_OK) {
+ error = grpc_set_tsi_error_result(
+ GRPC_ERROR_CREATE("Frame protector creation failed"), result);
+ security_handshake_failed_locked(exec_ctx, h, error);
+ goto done;
+ }
+ // Success.
+ // Create secure endpoint.
+ h->args->endpoint = grpc_secure_endpoint_create(
+ protector, h->args->endpoint, h->left_overs.slices, h->left_overs.count);
+ h->left_overs.count = 0;
+ h->left_overs.length = 0;
+ // Clear out the read buffer before it gets passed to the transport,
+ // since any excess bytes were already copied to h->left_overs.
+ grpc_slice_buffer_reset_and_unref(h->args->read_buffer);
+ // Add auth context to channel args.
+ grpc_arg auth_context_arg = grpc_auth_context_to_arg(h->auth_context);
+ grpc_channel_args *tmp_args = h->args->args;
+ h->args->args =
+ grpc_channel_args_copy_and_add(tmp_args, &auth_context_arg, 1);
+ grpc_channel_args_destroy(tmp_args);
+ // Invoke callback.
+ grpc_exec_ctx_sched(exec_ctx, h->on_handshake_done, GRPC_ERROR_NONE, NULL);
+ // Set shutdown to true so that subsequent calls to
+ // security_handshaker_shutdown() do nothing.
+ h->shutdown = true;
+done:
+ gpr_mu_unlock(&h->mu);
+ security_handshaker_unref(exec_ctx, h);
+}
+
+static grpc_error *check_peer_locked(grpc_exec_ctx *exec_ctx,
+ security_handshaker *h) {
+ tsi_peer peer;
+ tsi_result result = tsi_handshaker_extract_peer(h->handshaker, &peer);
+ if (result != TSI_OK) {
+ return grpc_set_tsi_error_result(
+ GRPC_ERROR_CREATE("Peer extraction failed"), result);
+ }
+ grpc_security_connector_check_peer(exec_ctx, h->connector, peer,
+ &h->auth_context, &h->on_peer_checked);
+ return GRPC_ERROR_NONE;
+}
+
+static grpc_error *send_handshake_bytes_to_peer_locked(grpc_exec_ctx *exec_ctx,
+ security_handshaker *h) {
+ // Get data to send.
+ tsi_result result = TSI_OK;
+ size_t offset = 0;
+ do {
+ size_t to_send_size = h->handshake_buffer_size - offset;
+ result = tsi_handshaker_get_bytes_to_send_to_peer(
+ h->handshaker, h->handshake_buffer + offset, &to_send_size);
+ offset += to_send_size;
+ if (result == TSI_INCOMPLETE_DATA) {
+ h->handshake_buffer_size *= 2;
+ h->handshake_buffer =
+ gpr_realloc(h->handshake_buffer, h->handshake_buffer_size);
+ }
+ } while (result == TSI_INCOMPLETE_DATA);
+ if (result != TSI_OK) {
+ return grpc_set_tsi_error_result(GRPC_ERROR_CREATE("Handshake failed"),
+ result);
+ }
+ // Send data.
+ grpc_slice to_send =
+ grpc_slice_from_copied_buffer((const char *)h->handshake_buffer, offset);
+ grpc_slice_buffer_reset_and_unref(&h->outgoing);
+ grpc_slice_buffer_add(&h->outgoing, to_send);
+ grpc_endpoint_write(exec_ctx, h->args->endpoint, &h->outgoing,
+ &h->on_handshake_data_sent_to_peer);
+ return GRPC_ERROR_NONE;
+}
+
+static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
+ void *arg, grpc_error *error) {
+ security_handshaker *h = arg;
+ gpr_mu_lock(&h->mu);
+ if (error != GRPC_ERROR_NONE || h->shutdown) {
+ security_handshake_failed_locked(
+ exec_ctx, h,
+ GRPC_ERROR_CREATE_REFERENCING("Handshake read failed", &error, 1));
+ gpr_mu_unlock(&h->mu);
+ security_handshaker_unref(exec_ctx, h);
+ return;
+ }
+ // Process received data.
+ tsi_result result = TSI_OK;
+ size_t consumed_slice_size = 0;
+ size_t i;
+ for (i = 0; i < h->args->read_buffer->count; i++) {
+ consumed_slice_size = GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i]);
+ result = tsi_handshaker_process_bytes_from_peer(
+ h->handshaker, GRPC_SLICE_START_PTR(h->args->read_buffer->slices[i]),
+ &consumed_slice_size);
+ if (!tsi_handshaker_is_in_progress(h->handshaker)) break;
+ }
+ if (tsi_handshaker_is_in_progress(h->handshaker)) {
+ /* We may need more data. */
+ if (result == TSI_INCOMPLETE_DATA) {
+ grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer,
+ &h->on_handshake_data_received_from_peer);
+ goto done;
+ } else {
+ error = send_handshake_bytes_to_peer_locked(exec_ctx, h);
+ if (error != GRPC_ERROR_NONE) {
+ security_handshake_failed_locked(exec_ctx, h, error);
+ gpr_mu_unlock(&h->mu);
+ security_handshaker_unref(exec_ctx, h);
+ return;
+ }
+ goto done;
+ }
+ }
+ if (result != TSI_OK) {
+ security_handshake_failed_locked(
+ exec_ctx, h, grpc_set_tsi_error_result(
+ GRPC_ERROR_CREATE("Handshake failed"), result));
+ gpr_mu_unlock(&h->mu);
+ security_handshaker_unref(exec_ctx, h);
+ return;
+ }
+ /* Handshake is done and successful this point. */
+ bool has_left_overs_in_current_slice =
+ (consumed_slice_size <
+ GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i]));
+ size_t num_left_overs = (has_left_overs_in_current_slice ? 1 : 0) +
+ h->args->read_buffer->count - i - 1;
+ if (num_left_overs > 0) {
+ /* Put the leftovers in our buffer (ownership transfered). */
+ if (has_left_overs_in_current_slice) {
+ grpc_slice_buffer_add(
+ &h->left_overs,
+ grpc_slice_split_tail(&h->args->read_buffer->slices[i],
+ consumed_slice_size));
+ /* split_tail above increments refcount. */
+ grpc_slice_unref(h->args->read_buffer->slices[i]);
+ }
+ grpc_slice_buffer_addn(
+ &h->left_overs, &h->args->read_buffer->slices[i + 1],
+ num_left_overs - (size_t)has_left_overs_in_current_slice);
+ }
+ // Check peer.
+ error = check_peer_locked(exec_ctx, h);
+ if (error != GRPC_ERROR_NONE) {
+ security_handshake_failed_locked(exec_ctx, h, error);
+ gpr_mu_unlock(&h->mu);
+ security_handshaker_unref(exec_ctx, h);
+ return;
+ }
+done:
+ gpr_mu_unlock(&h->mu);
+}
+
+static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ security_handshaker *h = arg;
+ gpr_mu_lock(&h->mu);
+ if (error != GRPC_ERROR_NONE || h->shutdown) {
+ security_handshake_failed_locked(
+ exec_ctx, h,
+ GRPC_ERROR_CREATE_REFERENCING("Handshake write failed", &error, 1));
+ gpr_mu_unlock(&h->mu);
+ security_handshaker_unref(exec_ctx, h);
+ return;
+ }
+ /* We may be done. */
+ if (tsi_handshaker_is_in_progress(h->handshaker)) {
+ grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer,
+ &h->on_handshake_data_received_from_peer);
+ } else {
+ error = check_peer_locked(exec_ctx, h);
+ if (error != GRPC_ERROR_NONE) {
+ security_handshake_failed_locked(exec_ctx, h, error);
+ gpr_mu_unlock(&h->mu);
+ security_handshaker_unref(exec_ctx, h);
+ return;
+ }
+ }
+ gpr_mu_unlock(&h->mu);
+}
+
+//
+// public handshaker API
+//
+
+static void security_handshaker_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_handshaker *handshaker) {
+ security_handshaker *h = (security_handshaker *)handshaker;
+ security_handshaker_unref(exec_ctx, h);
+}
+
+static void security_handshaker_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_handshaker *handshaker) {
+ security_handshaker *h = (security_handshaker *)handshaker;
+ gpr_mu_lock(&h->mu);
+ if (!h->shutdown) {
+ h->shutdown = true;
+ grpc_endpoint_shutdown(exec_ctx, h->args->endpoint);
+ cleanup_args_for_failure_locked(h);
+ }
+ gpr_mu_unlock(&h->mu);
+}
+
+static void security_handshaker_do_handshake(grpc_exec_ctx *exec_ctx,
+ grpc_handshaker *handshaker,
+ grpc_tcp_server_acceptor *acceptor,
+ grpc_closure *on_handshake_done,
+ grpc_handshaker_args *args) {
+ security_handshaker *h = (security_handshaker *)handshaker;
+ gpr_mu_lock(&h->mu);
+ h->args = args;
+ h->on_handshake_done = on_handshake_done;
+ gpr_ref(&h->refs);
+ grpc_error *error = send_handshake_bytes_to_peer_locked(exec_ctx, h);
+ if (error != GRPC_ERROR_NONE) {
+ security_handshake_failed_locked(exec_ctx, h, error);
+ gpr_mu_unlock(&h->mu);
+ security_handshaker_unref(exec_ctx, h);
+ return;
+ }
+ gpr_mu_unlock(&h->mu);
+}
+
+static const grpc_handshaker_vtable security_handshaker_vtable = {
+ security_handshaker_destroy, security_handshaker_shutdown,
+ security_handshaker_do_handshake};
+
+static grpc_handshaker *security_handshaker_create(
+ grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker,
+ grpc_security_connector *connector) {
+ security_handshaker *h = gpr_malloc(sizeof(security_handshaker));
+ memset(h, 0, sizeof(security_handshaker));
+ grpc_handshaker_init(&security_handshaker_vtable, &h->base);
+ h->handshaker = handshaker;
+ h->connector = GRPC_SECURITY_CONNECTOR_REF(connector, "handshake");
+ gpr_mu_init(&h->mu);
+ gpr_ref_init(&h->refs, 1);
+ h->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE;
+ h->handshake_buffer = gpr_malloc(h->handshake_buffer_size);
+ grpc_closure_init(&h->on_handshake_data_sent_to_peer,
+ on_handshake_data_sent_to_peer, h);
+ grpc_closure_init(&h->on_handshake_data_received_from_peer,
+ on_handshake_data_received_from_peer, h);
+ grpc_closure_init(&h->on_peer_checked, on_peer_checked, h);
+ grpc_slice_buffer_init(&h->left_overs);
+ grpc_slice_buffer_init(&h->outgoing);
+ return &h->base;
+}
+
+//
+// fail_handshaker
+//
+
+static void fail_handshaker_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_handshaker *handshaker) {
+ gpr_free(handshaker);
+}
+
+static void fail_handshaker_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_handshaker *handshaker) {}
+
+static void fail_handshaker_do_handshake(grpc_exec_ctx *exec_ctx,
+ grpc_handshaker *handshaker,
+ grpc_tcp_server_acceptor *acceptor,
+ grpc_closure *on_handshake_done,
+ grpc_handshaker_args *args) {
+ grpc_exec_ctx_sched(exec_ctx, on_handshake_done,
+ GRPC_ERROR_CREATE("Failed to create security handshaker"),
+ NULL);
+}
+
+static const grpc_handshaker_vtable fail_handshaker_vtable = {
+ fail_handshaker_destroy, fail_handshaker_shutdown,
+ fail_handshaker_do_handshake};
+
+static grpc_handshaker *fail_handshaker_create() {
+ grpc_handshaker *h = gpr_malloc(sizeof(*h));
+ grpc_handshaker_init(&fail_handshaker_vtable, h);
+ return h;
+}
+
+//
+// exported functions
+//
+
+grpc_handshaker *grpc_security_handshaker_create(
+ grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker,
+ grpc_security_connector *connector) {
+ // If no TSI handshaker was created, return a handshaker that always fails.
+ // Otherwise, return a real security handshaker.
+ if (handshaker == NULL) {
+ return fail_handshaker_create();
+ } else {
+ return security_handshaker_create(exec_ctx, handshaker, connector);
+ }
+}
diff --git a/src/core/lib/security/transport/handshake.h b/src/core/lib/security/transport/security_handshaker.h
index f894540515..5ddbf4b451 100644
--- a/src/core/lib/security/transport/handshake.h
+++ b/src/core/lib/security/transport/security_handshaker.h
@@ -31,20 +31,16 @@
*
*/
-#ifndef GRPC_CORE_LIB_SECURITY_TRANSPORT_HANDSHAKE_H
-#define GRPC_CORE_LIB_SECURITY_TRANSPORT_HANDSHAKE_H
+#ifndef GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H
+#define GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H
-#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/channel/handshaker.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/security/transport/security_connector.h"
-/* Calls the callback upon completion. Takes owership of handshaker and
- * read_buffer. */
-void grpc_do_security_handshake(
+/// Creates a security handshaker using \a handshaker.
+grpc_handshaker *grpc_security_handshaker_create(
grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker,
- grpc_security_connector *connector, bool is_client_side,
- grpc_endpoint *nonsecure_endpoint, grpc_slice_buffer *read_buffer,
- gpr_timespec deadline, grpc_security_handshake_done_cb cb, void *user_data);
+ grpc_security_connector *connector);
-void grpc_security_handshake_shutdown(grpc_exec_ctx *exec_ctx, void *handshake);
-
-#endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_HANDSHAKE_H */
+#endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H */
diff --git a/src/core/lib/support/backoff.c b/src/core/lib/support/backoff.c
index e89ef47220..0612472712 100644
--- a/src/core/lib/support/backoff.c
+++ b/src/core/lib/support/backoff.c
@@ -35,8 +35,10 @@
#include <grpc/support/useful.h>
-void gpr_backoff_init(gpr_backoff *backoff, double multiplier, double jitter,
+void gpr_backoff_init(gpr_backoff *backoff, int64_t initial_connect_timeout,
+ double multiplier, double jitter,
int64_t min_timeout_millis, int64_t max_timeout_millis) {
+ backoff->initial_connect_timeout = initial_connect_timeout;
backoff->multiplier = multiplier;
backoff->jitter = jitter;
backoff->min_timeout_millis = min_timeout_millis;
@@ -45,9 +47,10 @@ void gpr_backoff_init(gpr_backoff *backoff, double multiplier, double jitter,
}
gpr_timespec gpr_backoff_begin(gpr_backoff *backoff, gpr_timespec now) {
- backoff->current_timeout_millis = backoff->min_timeout_millis;
- return gpr_time_add(
- now, gpr_time_from_millis(backoff->current_timeout_millis, GPR_TIMESPAN));
+ backoff->current_timeout_millis = backoff->initial_connect_timeout;
+ const int64_t first_timeout =
+ GPR_MAX(backoff->current_timeout_millis, backoff->min_timeout_millis);
+ return gpr_time_add(now, gpr_time_from_millis(first_timeout, GPR_TIMESPAN));
}
/* Generate a random number between 0 and 1. */
@@ -57,20 +60,28 @@ static double generate_uniform_random_number(uint32_t *rng_state) {
}
gpr_timespec gpr_backoff_step(gpr_backoff *backoff, gpr_timespec now) {
- double new_timeout_millis =
+ const double new_timeout_millis =
backoff->multiplier * (double)backoff->current_timeout_millis;
- double jitter_range = backoff->jitter * new_timeout_millis;
- double jitter =
+ backoff->current_timeout_millis =
+ GPR_MIN((int64_t)new_timeout_millis, backoff->max_timeout_millis);
+
+ const double jitter_range_width = backoff->jitter * new_timeout_millis;
+ const double jitter =
(2 * generate_uniform_random_number(&backoff->rng_state) - 1) *
- jitter_range;
+ jitter_range_width;
+
backoff->current_timeout_millis =
- GPR_CLAMP((int64_t)(new_timeout_millis + jitter),
- backoff->min_timeout_millis, backoff->max_timeout_millis);
- return gpr_time_add(
+ (int64_t)((double)(backoff->current_timeout_millis) + jitter);
+
+ const gpr_timespec current_deadline = gpr_time_add(
now, gpr_time_from_millis(backoff->current_timeout_millis, GPR_TIMESPAN));
+
+ const gpr_timespec min_deadline = gpr_time_add(
+ now, gpr_time_from_millis(backoff->min_timeout_millis, GPR_TIMESPAN));
+
+ return gpr_time_max(current_deadline, min_deadline);
}
void gpr_backoff_reset(gpr_backoff *backoff) {
- // forces step() to return a timeout of min_timeout_millis
- backoff->current_timeout_millis = 0;
+ backoff->current_timeout_millis = backoff->initial_connect_timeout;
}
diff --git a/src/core/lib/support/backoff.h b/src/core/lib/support/backoff.h
index 6d40c15546..5e9b740824 100644
--- a/src/core/lib/support/backoff.h
+++ b/src/core/lib/support/backoff.h
@@ -37,7 +37,9 @@
#include <grpc/support/time.h>
typedef struct {
- /// const: multiplier between retry attempts
+ /// const: how long to wait after the first failure before retrying
+ int64_t initial_connect_timeout;
+ /// const: factor with which to multiply backoff after a failed retry
double multiplier;
/// const: amount to randomize backoffs
double jitter;
@@ -54,7 +56,8 @@ typedef struct {
} gpr_backoff;
/// Initialize backoff machinery - does not need to be destroyed
-void gpr_backoff_init(gpr_backoff *backoff, double multiplier, double jitter,
+void gpr_backoff_init(gpr_backoff *backoff, int64_t initial_connect_timeout,
+ double multiplier, double jitter,
int64_t min_timeout_millis, int64_t max_timeout_millis);
/// Begin retry loop: returns a timespec for the NEXT retry
diff --git a/src/core/lib/support/subprocess_posix.c b/src/core/lib/support/subprocess_posix.c
index daf371d03e..4247a1c12b 100644
--- a/src/core/lib/support/subprocess_posix.c
+++ b/src/core/lib/support/subprocess_posix.c
@@ -98,7 +98,8 @@ retry:
if (errno == EINTR) {
goto retry;
}
- gpr_log(GPR_ERROR, "waitpid failed: %s", strerror(errno));
+ gpr_log(GPR_ERROR, "waitpid failed for pid %d: %s", p->pid,
+ strerror(errno));
return -1;
}
p->joined = true;
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 1e0f3eeca5..8ca3cab9d5 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -1551,6 +1551,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
+ /* IF this is a server, then GRPC_OP_RECV_INITIAL_METADATA *must* come
+ from server.c. In that case, it's coming from accept_stream, and in
+ that case we're not necessarily covered by a poller. */
+ stream_op->covered_by_poller = call->is_client;
call->received_initial_metadata = 1;
call->buffered_metadata[0] = op->data.recv_initial_metadata;
grpc_closure_init(&call->receiving_initial_metadata_ready,
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 4e0feb56ac..184c1a1a16 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -354,11 +354,13 @@ static void dump_pending_tags(grpc_completion_queue *cc) {
gpr_strvec v;
gpr_strvec_init(&v);
gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:"));
+ gpr_mu_lock(cc->mu);
for (size_t i = 0; i < cc->outstanding_tag_count; i++) {
char *s;
gpr_asprintf(&s, " %p", cc->outstanding_tags[i]);
gpr_strvec_add(&v, s);
}
+ gpr_mu_unlock(cc->mu);
char *out = gpr_strvec_flatten(&v, NULL);
gpr_strvec_destroy(&v);
gpr_log(GPR_DEBUG, "%s", out);
diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c
index 89072879d9..4f49d7cf7d 100644
--- a/src/core/lib/transport/connectivity_state.c
+++ b/src/core/lib/transport/connectivity_state.c
@@ -100,7 +100,12 @@ grpc_connectivity_state grpc_connectivity_state_check(
return tracker->current_state;
}
-int grpc_connectivity_state_notify_on_state_change(
+bool grpc_connectivity_state_has_watchers(
+ grpc_connectivity_state_tracker *connectivity_state) {
+ return connectivity_state->watchers != NULL;
+}
+
+bool grpc_connectivity_state_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state *current, grpc_closure *notify) {
if (grpc_connectivity_state_trace) {
@@ -119,7 +124,7 @@ int grpc_connectivity_state_notify_on_state_change(
grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL);
tracker->watchers = w->next;
gpr_free(w);
- return 0;
+ return false;
}
while (w != NULL) {
grpc_connectivity_state_watcher *rm_candidate = w->next;
@@ -127,11 +132,11 @@ int grpc_connectivity_state_notify_on_state_change(
grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL);
w->next = w->next->next;
gpr_free(rm_candidate);
- return 0;
+ return false;
}
w = w->next;
}
- return 0;
+ return false;
} else {
if (tracker->current_state != *current) {
*current = tracker->current_state;
diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h
index 7a2fa52c10..769c675b79 100644
--- a/src/core/lib/transport/connectivity_state.h
+++ b/src/core/lib/transport/connectivity_state.h
@@ -75,13 +75,16 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
grpc_error *associated_error,
const char *reason);
+bool grpc_connectivity_state_has_watchers(
+ grpc_connectivity_state_tracker *tracker);
+
grpc_connectivity_state grpc_connectivity_state_check(
grpc_connectivity_state_tracker *tracker, grpc_error **current_error);
/** Return 1 if the channel should start connecting, 0 otherwise.
If current==NULL cancel notify if it is already queued (success==0 in that
case) */
-int grpc_connectivity_state_notify_on_state_change(
+bool grpc_connectivity_state_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state *current, grpc_closure *notify);