diff options
author | David Garcia Quintas <dgq@google.com> | 2016-10-14 00:07:31 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2016-10-14 00:07:31 -0700 |
commit | fd7caf570843be6f2cf180e136180f15de2c1dbe (patch) | |
tree | 1179d4290571255cac201ba40f05119c93a5820a /src/core/ext | |
parent | c5986021b98cf9bc95496f6d7af9b39864483c2c (diff) | |
parent | 2f27a16a6bfaea7c81024ad8ee377ff61216c520 (diff) |
Merge branch 'master' of github.com:grpc/grpc into new_lb_md_keys
Diffstat (limited to 'src/core/ext')
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.c | 18 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c | 196 |
2 files changed, 115 insertions, 99 deletions
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 0442c864f5..deda2600e0 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -329,8 +329,8 @@ static bool is_server_valid(const grpc_grpclb_server *server, size_t idx, if (server->port >> 16 != 0) { if (log) { gpr_log(GPR_ERROR, - "Invalid port '%d' at index %zu of serverlist. Ignoring.", - server->port, idx); + "Invalid port '%d' at index %lu of serverlist. Ignoring.", + server->port, (unsigned long)idx); } return false; } @@ -338,9 +338,9 @@ static bool is_server_valid(const grpc_grpclb_server *server, size_t idx, if (ip->size != 4 && ip->size != 16) { if (log) { gpr_log(GPR_ERROR, - "Expected IP to be 4 or 16 bytes, got %d at index %zu of " + "Expected IP to be 4 or 16 bytes, got %d at index %lu of " "serverlist. Ignoring", - ip->size, idx); + ip->size, (unsigned long)idx); } return false; } @@ -1070,8 +1070,8 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { if (serverlist != NULL) { gpr_slice_unref(response_slice); if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "Serverlist with %zu servers received", - serverlist->num_servers); + gpr_log(GPR_INFO, "Serverlist with %lu servers received", + (unsigned long)serverlist->num_servers); } /* update serverlist */ @@ -1155,10 +1155,10 @@ static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "status from lb server received. Status = %d, Details = '%s', " - "Capaticy " - "= %zu", + "Capacity " + "= %lu", lb_client->status, lb_client->status_details, - lb_client->status_details_capacity); + (unsigned long)lb_client->status_details_capacity); } /* TODO(dgq): deal with stream termination properly (fire up another one? * fail the original call?) */ diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index da3e284fcf..563271f4f8 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -61,13 +61,12 @@ typedef struct server_secure_state { grpc_server_credentials *creds; bool is_shutdown; gpr_mu mu; - gpr_refcount refcount; - grpc_closure destroy_closure; - grpc_closure *destroy_callback; + grpc_closure tcp_server_shutdown_complete; + grpc_closure *server_destroy_listener_done; } server_secure_state; typedef struct server_secure_connect { - server_secure_state *state; + server_secure_state *server_state; grpc_pollset *accepting_pollset; grpc_tcp_server_acceptor *acceptor; grpc_handshake_manager *handshake_mgr; @@ -77,39 +76,28 @@ typedef struct server_secure_connect { grpc_channel_args *args; } server_secure_connect; -static void state_ref(server_secure_state *state) { gpr_ref(&state->refcount); } - -static void state_unref(server_secure_state *state) { - if (gpr_unref(&state->refcount)) { - /* ensure all threads have unlocked */ - gpr_mu_lock(&state->mu); - gpr_mu_unlock(&state->mu); - /* clean up */ - GRPC_SECURITY_CONNECTOR_UNREF(&state->sc->base, "server"); - grpc_server_credentials_unref(state->creds); - gpr_free(state); - } -} - static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep, grpc_security_status status, grpc_endpoint *secure_endpoint, grpc_auth_context *auth_context) { - server_secure_connect *state = statep; + server_secure_connect *connection_state = statep; if (status == GRPC_SECURITY_OK) { if (secure_endpoint) { - gpr_mu_lock(&state->state->mu); - if (!state->state->is_shutdown) { + gpr_mu_lock(&connection_state->server_state->mu); + if (!connection_state->server_state->is_shutdown) { grpc_transport *transport = grpc_create_chttp2_transport( - exec_ctx, grpc_server_get_channel_args(state->state->server), + exec_ctx, grpc_server_get_channel_args( + connection_state->server_state->server), secure_endpoint, 0); grpc_arg args_to_add[2]; - args_to_add[0] = grpc_server_credentials_to_arg(state->state->creds); + args_to_add[0] = grpc_server_credentials_to_arg( + connection_state->server_state->creds); args_to_add[1] = grpc_auth_context_to_arg(auth_context); grpc_channel_args *args_copy = grpc_channel_args_copy_and_add( - state->args, args_to_add, GPR_ARRAY_SIZE(args_to_add)); - grpc_server_setup_transport(exec_ctx, state->state->server, transport, - state->accepting_pollset, args_copy); + connection_state->args, args_to_add, GPR_ARRAY_SIZE(args_to_add)); + grpc_server_setup_transport( + exec_ctx, connection_state->server_state->server, transport, + connection_state->accepting_pollset, args_copy); grpc_channel_args_destroy(args_copy); grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL); } else { @@ -117,21 +105,21 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep, * gone away. */ grpc_endpoint_destroy(exec_ctx, secure_endpoint); } - gpr_mu_unlock(&state->state->mu); + gpr_mu_unlock(&connection_state->server_state->mu); } } else { gpr_log(GPR_ERROR, "Secure transport failed with error %d", status); } - grpc_channel_args_destroy(state->args); - state_unref(state->state); - gpr_free(state); + grpc_channel_args_destroy(connection_state->args); + grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp); + gpr_free(connection_state); } static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, grpc_channel_args *args, gpr_slice_buffer *read_buffer, void *user_data, grpc_error *error) { - server_secure_connect *state = user_data; + server_secure_connect *connection_state = user_data; if (error != GRPC_ERROR_NONE) { const char *error_str = grpc_error_string(error); gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str); @@ -139,81 +127,107 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, GRPC_ERROR_UNREF(error); grpc_channel_args_destroy(args); gpr_free(read_buffer); - grpc_handshake_manager_shutdown(exec_ctx, state->handshake_mgr); - grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr); - state_unref(state->state); - gpr_free(state); + grpc_handshake_manager_shutdown(exec_ctx, connection_state->handshake_mgr); + grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); + grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp); + gpr_free(connection_state); return; } - grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr); - state->handshake_mgr = NULL; + grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); + connection_state->handshake_mgr = NULL; // TODO(roth, jboeuf): Convert security connector handshaking to use new // handshake API, and then move the code from on_secure_handshake_done() // into this function. - state->args = args; + connection_state->args = args; grpc_server_security_connector_do_handshake( - exec_ctx, state->state->sc, state->acceptor, endpoint, read_buffer, - state->deadline, on_secure_handshake_done, state); + exec_ctx, connection_state->server_state->sc, connection_state->acceptor, + endpoint, read_buffer, connection_state->deadline, + on_secure_handshake_done, connection_state); } static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp, grpc_pollset *accepting_pollset, grpc_tcp_server_acceptor *acceptor) { - server_secure_connect *state = gpr_malloc(sizeof(*state)); - state->state = statep; - state_ref(state->state); - state->accepting_pollset = accepting_pollset; - state->acceptor = acceptor; - state->handshake_mgr = grpc_handshake_manager_create(); + server_secure_state *server_state = statep; + server_secure_connect *connection_state = NULL; + gpr_mu_lock(&server_state->mu); + if (server_state->is_shutdown) { + gpr_mu_unlock(&server_state->mu); + grpc_endpoint_destroy(exec_ctx, tcp); + return; + } + gpr_mu_unlock(&server_state->mu); + grpc_tcp_server_ref(server_state->tcp); + connection_state = gpr_malloc(sizeof(*connection_state)); + connection_state->server_state = server_state; + connection_state->accepting_pollset = accepting_pollset; + connection_state->acceptor = acceptor; + connection_state->handshake_mgr = grpc_handshake_manager_create(); // TODO(roth): We should really get this timeout value from channel // args instead of hard-coding it. - state->deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_seconds(120, GPR_TIMESPAN)); + connection_state->deadline = gpr_time_add( + gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); grpc_handshake_manager_do_handshake( - exec_ctx, state->handshake_mgr, tcp, - grpc_server_get_channel_args(state->state->server), state->deadline, - acceptor, on_handshake_done, state); + exec_ctx, connection_state->handshake_mgr, tcp, + grpc_server_get_channel_args(connection_state->server_state->server), + connection_state->deadline, acceptor, on_handshake_done, + connection_state); } /* Server callback: start listening on our ports */ -static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep, - grpc_pollset **pollsets, size_t pollset_count) { - server_secure_state *state = statep; - grpc_tcp_server_start(exec_ctx, state->tcp, pollsets, pollset_count, - on_accept, state); +static void server_start_listener(grpc_exec_ctx *exec_ctx, grpc_server *server, + void *statep, grpc_pollset **pollsets, + size_t pollset_count) { + server_secure_state *server_state = statep; + gpr_mu_lock(&server_state->mu); + server_state->is_shutdown = false; + gpr_mu_unlock(&server_state->mu); + grpc_tcp_server_start(exec_ctx, server_state->tcp, pollsets, pollset_count, + on_accept, server_state); } -static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, - grpc_error *error) { - server_secure_state *state = statep; - if (state->destroy_callback != NULL) { - state->destroy_callback->cb(exec_ctx, state->destroy_callback->cb_arg, - GRPC_ERROR_REF(error)); +static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *statep, + grpc_error *error) { + server_secure_state *server_state = statep; + /* ensure all threads have unlocked */ + gpr_mu_lock(&server_state->mu); + grpc_closure *destroy_done = server_state->server_destroy_listener_done; + GPR_ASSERT(server_state->is_shutdown); + gpr_mu_unlock(&server_state->mu); + /* clean up */ + grpc_server_security_connector_shutdown(exec_ctx, server_state->sc); + + /* Flush queued work before a synchronous unref. */ + grpc_exec_ctx_flush(exec_ctx); + GRPC_SECURITY_CONNECTOR_UNREF(&server_state->sc->base, "server"); + grpc_server_credentials_unref(server_state->creds); + + if (destroy_done != NULL) { + destroy_done->cb(exec_ctx, destroy_done->cb_arg, GRPC_ERROR_REF(error)); + grpc_exec_ctx_flush(exec_ctx); } - grpc_server_security_connector_shutdown(exec_ctx, state->sc); - state_unref(state); + gpr_free(server_state); } -/* Server callback: destroy the tcp listener (so we don't generate further - callbacks) */ -static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep, - grpc_closure *callback) { - server_secure_state *state = statep; +static void server_destroy_listener(grpc_exec_ctx *exec_ctx, + grpc_server *server, void *statep, + grpc_closure *callback) { + server_secure_state *server_state = statep; grpc_tcp_server *tcp; - gpr_mu_lock(&state->mu); - state->is_shutdown = true; - state->destroy_callback = callback; - tcp = state->tcp; - gpr_mu_unlock(&state->mu); + gpr_mu_lock(&server_state->mu); + server_state->is_shutdown = true; + server_state->server_destroy_listener_done = callback; + tcp = server_state->tcp; + gpr_mu_unlock(&server_state->mu); grpc_tcp_server_shutdown_listeners(exec_ctx, tcp); - grpc_tcp_server_unref(exec_ctx, tcp); + grpc_tcp_server_unref(exec_ctx, server_state->tcp); } int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, grpc_server_credentials *creds) { grpc_resolved_addresses *resolved = NULL; grpc_tcp_server *tcp = NULL; - server_secure_state *state = NULL; + server_secure_state *server_state = NULL; size_t i; size_t count = 0; int port_num = -1; @@ -253,22 +267,22 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, if (err != GRPC_ERROR_NONE) { goto error; } - state = gpr_malloc(sizeof(*state)); - memset(state, 0, sizeof(*state)); - grpc_closure_init(&state->destroy_closure, destroy_done, state); - err = grpc_tcp_server_create(&state->destroy_closure, + server_state = gpr_malloc(sizeof(*server_state)); + memset(server_state, 0, sizeof(*server_state)); + grpc_closure_init(&server_state->tcp_server_shutdown_complete, + tcp_server_shutdown_complete, server_state); + err = grpc_tcp_server_create(&server_state->tcp_server_shutdown_complete, grpc_server_get_channel_args(server), &tcp); if (err != GRPC_ERROR_NONE) { goto error; } - state->server = server; - state->tcp = tcp; - state->sc = sc; - state->creds = grpc_server_credentials_ref(creds); - state->is_shutdown = false; - gpr_mu_init(&state->mu); - gpr_ref_init(&state->refcount, 1); + server_state->server = server; + server_state->tcp = tcp; + server_state->sc = sc; + server_state->creds = grpc_server_credentials_ref(creds); + server_state->is_shutdown = true; + gpr_mu_init(&server_state->mu); errors = gpr_malloc(sizeof(*errors) * resolved->naddrs); for (i = 0; i < resolved->naddrs; i++) { @@ -313,7 +327,8 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, grpc_resolved_addresses_destroy(resolved); /* Register with the server only upon success */ - grpc_server_add_listener(&exec_ctx, server, state, start, destroy); + grpc_server_add_listener(&exec_ctx, server, server_state, + server_start_listener, server_destroy_listener); grpc_exec_ctx_finish(&exec_ctx); return port_num; @@ -334,10 +349,11 @@ error: grpc_tcp_server_unref(&exec_ctx, tcp); } else { if (sc) { + grpc_exec_ctx_flush(&exec_ctx); GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "server"); } - if (state) { - gpr_free(state); + if (server_state) { + gpr_free(server_state); } } grpc_exec_ctx_finish(&exec_ctx); |